Constant data streaming using the python library

I am trying to make a python script that can monitor measurements over a very long time, could even be weeks. Originally I used the read function but I lose samples over the time that the rest of the code iterates. Now I am using the stream_buffer.samples_get but after about 30 seconds I get this error.

ValueError: get_range 8680644:38639504 out of available range 8689527:38689526

It seems like there is a limit on how many samples can stream because the values are the same every time. I know the read function had a contiguous data capture limit of 30 seconds as well. I didn’t know if you had suggestions on how to constantly stream data, or if there are even any commands that can stream data for that long. I have tried looking through the API but I don’t find it to be the easiest to understand because I am new to python. Thanks.

Hi @joulescopeluver - At full sample rate, Joulescopes generate a lot of data. Each signal current, voltage, and power generate 4 bytes at 1,000,000 times a second, or 4 MB/s. By default, the RAM buffer only holds 30 seconds of samples. If you want more than this, you can increase the buffer up to your available RAM or store to disk. For any full-rate capture over an hour, you likely need to be storing to disk.

For all three signals, you need about 1.2 TB/day for JLS storage, which includes the overhead. You can get 24 TB external hard drives for about $300 USD, which store about 20 days of full-rate Joulescope data.

The Joulescope Python software has two layers packages:

  1. The pyjoulescope_driver package is the low-level user-space driver that gives full control over the instrument. It has an asynchronous Publish-Subscribe API that is perfect for dealing with incoming sample data as it arrives. The Joulescope UI uses this directly.
  2. The joulescope package provides a synchronous API that makes simple tasks, like read easy. It provides the stream_buffer API to handle streaming data. It supports all JS110 features. For more recent JS220 features, you need to “bypass” this interface and use Publish-Subscribe. You can use a mix of the blocking and Publish-Subscribe APIs.

The python -m pyjoulescope_driver record entry point is the easiest way to capture full-rate data from the command line.

  1. Do you want full sample rate data? If not, what sample rate do you need?
  2. What do you want to do with this data? Open it in the Joulescope UI? Post-process with a custom script? Something else?
  3. How do you plan on ending the capture? Human control?

I will not be storing all of the data. My first pass script averaged out the current and voltage over the configured period and printed it to a csv file. Now I am looking to add overall power consumption to that script and the problem with how I was doing it is the script will lose samples while doing calculations and writing to the csv. It can be stopped by the voltage dropping below a certain point or can be configured to stop after a set time.

I have used the accumulator in the UI before I figure there must be some sort of way to pull data from that.

How about something like this:

#!/usr/bin/env python3
# Copyright 2022-2025 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Capture statistics to a CSV file.

See https://github.com/jetperch/joulescope_driver/blob/main/pyjoulescope_driver/entry_points/statistics.py
"""

from pyjoulescope_driver import Driver, time64
import argparse
import json
import sys
import time


def _frequency_validator(x):
    x = float(x)
    if not (1.0 <= float(x) <= 2000.0):
        raise ValueError('Frequency out of range: {x}')
    return x


def get_parser():
    p = argparse.ArgumentParser(
        description='Display and record statistics data from a single Joulescope JS220.',
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    p.add_argument('--frequency',
                   default=1.0,
                   type=_frequency_validator,
                   help='The statistics update frequency in Hz for the Joulescope between 1 and 2000.')
    p.add_argument('--duration',
                   type=time64.duration_to_seconds,
                   help='The maximum capture duration in float seconds. '
                        + 'Add a suffix for other units: s=seconds, m=minutes, h=hours, d=days')
    p.add_argument('--voltage_threshold',
                   help='The minimum voltage threshold to stop the capture.')
    p.add_argument('output_path',
                   help='The output CSV file path.')
    p.add_argument('--device',
                   help='The target Joulescope device_path for this command. '
                        + 'By default, use the only connected Joulescope.')
    return p


class Js220Stats:
    
    def __init__(self, device_path):
        """Initialize the statistics instance.
        
        :param device_path: The Joulescope device path to use.
        """
        self._device_path = device_path
        self._most_recent_stats = None
        
    def on_statistics_value(self, topic, value):
        # Uncomment to print the full statistics data structure for development
        # print(json.dumps(value, indent=2))
        t = value['time']['utc']['value'][0]
        # sample_id = value['time']['samples']['value'][0]
        t = time64.as_datetime(t)
        time_str = t.isoformat(timespec='microseconds').replace('+00:00', 'Z')
        current = value['signals']['current']['avg']['value']
        voltage = value['signals']['voltage']['avg']['value']
        power = value['signals']['power']['avg']['value']
        charge = value['accumulators']['charge']['value']
        energy = value['accumulators']['energy']['value']
        self._most_recent_stats = value
        self._fout.write(f'{time_str},{current},{voltage},{power},{charge},{energy}\n')
    
    def run(self, d, args):
        """Run statistics collection to a file.
        
        :param d: The Joulescope Driver instance.
        :param args: The parsed command-line arguments.
        """
        device = self._device_path
        d.open(device)
        try:
            if 'js220' in device:
                # JS220, always sensor-side statistics
                d.publish(device + '/s/i/range/mode', 'auto')
                scnt = int(round(1_000_000 / args.frequency))
                d.publish(device + '/s/stats/scnt', scnt)
                d.publish(device + '/s/stats/ctrl', 1)
                d.subscribe(device + '/s/stats/value', 'pub', self.on_statistics_value)
            else:
                print(f'Unsupported device {device}')
                return 1
            with open(args.output_path, 'wt') as f:
                self._fout = f
                self._fout.write("#time,current,voltage,power,charge,energy\n")
                try:
                    t_start = time.time()
                    while True:
                        if args.duration is not None and (time.time() - t_start) >= args.duration:
                            print('Stop due to duration')
                            break
                        if args.voltage_threshold is not None and most_recent_stats is not None:
                            if args.voltage_threshold >= value['signals']['voltage']['avg']['value']:
                                print('Stop due to voltage threshold')
                                break
                        time.sleep(0.025)
                except KeyboardInterrupt:
                    self._fout.flush()
        finally:
            d.close(device)

def run(args):
    global most_recent_stats
    with Driver() as d:
        devices = d.device_paths()
        if args.device is not None:
            if args.device not in devices:
                print(f'device {device} not found')
                return 1
            device_path = args.device_path
        elif len(devices) != 1:
            print(f'only supports 1 Joulescope at a time, found {len(devices)}')
            return 1
        else:
            device_path = devices[0]
        s = Js220Stats(device_path)
        return s.run(d, args)


if __name__ == '__main__':
    args = get_parser().parse_args()
    sys.exit(run(args))

This uses pyjoulescope_driver directly rather than the joulescope package.

This gives me an exceptional start, but the sensor status LED light seems to keep turning off around 40 minutes into my tests I am running on this script.

I will see if I can duplicate it. In the meantime, I have an improved version:

#!/usr/bin/env python3
# Copyright 2022-2025 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Capture statistics to a CSV file.

See https://github.com/jetperch/joulescope_driver/blob/main/pyjoulescope_driver/entry_points/statistics.py
"""

from pyjoulescope_driver import Driver, time64
import argparse
import json
import queue
import sys
import time


def _frequency_validator(x):
    x = float(x)
    if not (1.0 <= float(x) <= 2000.0):
        raise ValueError('Frequency out of range: {x}')
    return x


def _voltage_threshold_validator(x):
    x = float(x)
    if not (-15 <= float(x) <= 15):
        raise ValueError('Voltage threshold out of range: {x}')
    return x



def get_parser():
    p = argparse.ArgumentParser(
        description='Display and record statistics data from a single Joulescope JS220.',
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    p.add_argument('--frequency',
                   default=1.0,
                   type=_frequency_validator,
                   help='The statistics update frequency in Hz for the Joulescope between 1 and 2000.')
    p.add_argument('--duration',
                   type=time64.duration_to_seconds,
                   help='The maximum capture duration in float seconds. '
                        + 'Add a suffix for other units: s=seconds, m=minutes, h=hours, d=days')
    p.add_argument('--voltage-threshold', '--voltage_threshold',
                   type=_voltage_threshold_validator,
                   help='The minimum voltage threshold to stop the capture.')
    p.add_argument('output_path',
                   help='The output CSV file path.')
    p.add_argument('--device',
                   help='The target Joulescope device_path for this command. '
                        + 'By default, use the only connected Joulescope.')
    return p


class Js220Stats:
    
    def __init__(self, device_path):
        """Initialize the statistics instance.
        
        :param device_path: The Joulescope device path to use.
        """
        self._device_path = device_path
        self._queue = queue.Queue()
        
    def on_statistics_value(self, topic, value):
        self._queue.put(value)
    
    def run(self, d, args):
        """Run statistics collection to a file.
        
        :param d: The Joulescope Driver instance.
        :param args: The parsed command-line arguments.
        """
        device = self._device_path
        d.open(device)
        try:
            if 'js220' in device:
                # JS220, always sensor-side statistics
                d.publish(device + '/s/i/range/mode', 'auto')
                scnt = int(round(1_000_000 / args.frequency))
                d.publish(device + '/s/stats/scnt', scnt)
                d.publish(device + '/s/stats/ctrl', 1)
                d.subscribe(device + '/s/stats/value', 'pub', self.on_statistics_value)
            else:
                print(f'Unsupported device {device}')
                return 1
            with open(args.output_path, 'wt') as f:
                f.write("#time,current,voltage,power,charge,energy\n")
                try:
                    t_start = time.time()
                    while True:
                        try:
                            value = self._queue.get(timeout=0.1)
                        except queue.Empty:
                            continue
                        # Uncomment to print the full statistics data structure for development
                        # print(json.dumps(value, indent=2))
                        t = value['time']['utc']['value'][0]
                        # sample_id = value['time']['samples']['value'][0]
                        t = time64.as_datetime(t)
                        time_str = t.isoformat(timespec='microseconds').replace('+00:00', 'Z')
                        current = value['signals']['current']['avg']['value']
                        voltage = value['signals']['voltage']['avg']['value']
                        power = value['signals']['power']['avg']['value']
                        charge = value['accumulators']['charge']['value']
                        energy = value['accumulators']['energy']['value']                        
                        f.write(f'{time_str},{current},{voltage},{power},{charge},{energy}\n')

                        if args.duration is not None and (time.time() - t_start) >= args.duration:
                            print('Stop due to duration')
                            break
                        if args.voltage_threshold is not None and args.voltage_threshold >= voltage:
                            print('Stop due to voltage threshold')
                            break
                except KeyboardInterrupt:
                    f.flush()
        finally:
            d.close(device)


def run(args):
    with Driver() as d:
        devices = d.device_paths()
        if args.device is not None:
            if args.device not in devices:
                print(f'device {device} not found')
                return 1
            device_path = args.device_path
        elif len(devices) != 1:
            print(f'only supports 1 Joulescope at a time, found {len(devices)}')
            return 1
        else:
            device_path = devices[0]
        s = Js220Stats(device_path)
        return s.run(d, args)


if __name__ == '__main__':
    args = get_parser().parse_args()
    sys.exit(run(args))

This posts the status value to a queue to prevent blocking the pyjoulescope_driver thread, just in case the file write delay is a problem.

I also fixed the voltage threshold parameter so that it is correctly converted to a float.

Also ensure that you are plugging your Joulescope directly into a USB port on the host computer. Avoid using USB hubs, docks, and adapters. If you must use a hub, ensure that it is a powered hub with sufficient power for all connected devices.

For your long-term test, you should plan on using a dedicated computer that you prepare. You need to disable power management and anything that can reboot the computer or degrade performance. I recommended disconnecting networking entirely.

See Performing long-term captures on Windows

Yeah I had my joulescope plugged into a usb port and I changed it and haven’t seen the issue yet.

1 Like

Just successfully ran an hour test, going to attempt a day long and see how it goes. Thank you for response time and always being a big help.

1 Like

So, my Windows computer rebooted last night to install 2025-08 Cumulative Update for Windows 11 Version 24H2 for x64-based Systems (KB5063878) (26100.4946). The CSV has entries up until the time the computer rebooted, and it captured data for about 6 hours without issue.

Please post again if you run into any issues with the data collection!