I will see if I can duplicate it. In the meantime, I have an improved version:
#!/usr/bin/env python3
# Copyright 2022-2025 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Capture statistics to a CSV file.
See https://github.com/jetperch/joulescope_driver/blob/main/pyjoulescope_driver/entry_points/statistics.py
"""
from pyjoulescope_driver import Driver, time64
import argparse
import json
import queue
import sys
import time
def _frequency_validator(x):
x = float(x)
if not (1.0 <= float(x) <= 2000.0):
raise ValueError('Frequency out of range: {x}')
return x
def _voltage_threshold_validator(x):
x = float(x)
if not (-15 <= float(x) <= 15):
raise ValueError('Voltage threshold out of range: {x}')
return x
def get_parser():
p = argparse.ArgumentParser(
description='Display and record statistics data from a single Joulescope JS220.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
p.add_argument('--frequency',
default=1.0,
type=_frequency_validator,
help='The statistics update frequency in Hz for the Joulescope between 1 and 2000.')
p.add_argument('--duration',
type=time64.duration_to_seconds,
help='The maximum capture duration in float seconds. '
+ 'Add a suffix for other units: s=seconds, m=minutes, h=hours, d=days')
p.add_argument('--voltage-threshold', '--voltage_threshold',
type=_voltage_threshold_validator,
help='The minimum voltage threshold to stop the capture.')
p.add_argument('output_path',
help='The output CSV file path.')
p.add_argument('--device',
help='The target Joulescope device_path for this command. '
+ 'By default, use the only connected Joulescope.')
return p
class Js220Stats:
def __init__(self, device_path):
"""Initialize the statistics instance.
:param device_path: The Joulescope device path to use.
"""
self._device_path = device_path
self._queue = queue.Queue()
def on_statistics_value(self, topic, value):
self._queue.put(value)
def run(self, d, args):
"""Run statistics collection to a file.
:param d: The Joulescope Driver instance.
:param args: The parsed command-line arguments.
"""
device = self._device_path
d.open(device)
try:
if 'js220' in device:
# JS220, always sensor-side statistics
d.publish(device + '/s/i/range/mode', 'auto')
scnt = int(round(1_000_000 / args.frequency))
d.publish(device + '/s/stats/scnt', scnt)
d.publish(device + '/s/stats/ctrl', 1)
d.subscribe(device + '/s/stats/value', 'pub', self.on_statistics_value)
else:
print(f'Unsupported device {device}')
return 1
with open(args.output_path, 'wt') as f:
f.write("#time,current,voltage,power,charge,energy\n")
try:
t_start = time.time()
while True:
try:
value = self._queue.get(timeout=0.1)
except queue.Empty:
continue
# Uncomment to print the full statistics data structure for development
# print(json.dumps(value, indent=2))
t = value['time']['utc']['value'][0]
# sample_id = value['time']['samples']['value'][0]
t = time64.as_datetime(t)
time_str = t.isoformat(timespec='microseconds').replace('+00:00', 'Z')
current = value['signals']['current']['avg']['value']
voltage = value['signals']['voltage']['avg']['value']
power = value['signals']['power']['avg']['value']
charge = value['accumulators']['charge']['value']
energy = value['accumulators']['energy']['value']
f.write(f'{time_str},{current},{voltage},{power},{charge},{energy}\n')
if args.duration is not None and (time.time() - t_start) >= args.duration:
print('Stop due to duration')
break
if args.voltage_threshold is not None and args.voltage_threshold >= voltage:
print('Stop due to voltage threshold')
break
except KeyboardInterrupt:
f.flush()
finally:
d.close(device)
def run(args):
with Driver() as d:
devices = d.device_paths()
if args.device is not None:
if args.device not in devices:
print(f'device {device} not found')
return 1
device_path = args.device_path
elif len(devices) != 1:
print(f'only supports 1 Joulescope at a time, found {len(devices)}')
return 1
else:
device_path = devices[0]
s = Js220Stats(device_path)
return s.run(d, args)
if __name__ == '__main__':
args = get_parser().parse_args()
sys.exit(run(args))
This posts the status value to a queue to prevent blocking the pyjoulescope_driver thread, just in case the file write delay is a problem.
I also fixed the voltage threshold parameter so that it is correctly converted to a float.