Facing Frequent timeout error in Python Script

Hi Team ,
I have developed python script where I can fetch the real time current (VCC and VCCQ) of an product I am working on ,using two Joulescopes. I am able to get the values correctly ,but frequently I am facing below error .Please look into this error and try to provide the solution for my problem.

Output read timeout reached.
ll_await timed out
ll_await_pubsub_topic(c/!pong) timed out
ping_wait(1) timed out
Traceback (most recent call last):
File “C:\Users\Test\PycharmProjects\Joulescope Automation\Power Automation\hibernate_master.py”, line 286, in
run()
File “C:\Users\Test\PycharmProjects\Joulescope Automation\Power Automation\hibernate_master.py”, line 275, in run
command_execution()
File “C:\Users\Test\PycharmProjects\Joulescope Automation\Power Automation\hibernate_master.py”, line 231, in command_execution
joulescope_measurement(command)
File “C:\Users\Test\PycharmProjects\Joulescope Automation\Power Automation\hibernate_master.py”, line 136, in joulescope_measurement
vcc_device.open(timeout=15)
File “C:\Users\Test\PycharmProjects\pythonProject1.venv\Lib\site-packages\joulescope\v1\device.py”, line 369, in open
rc = self._driver.open(self._path, mode, timeout=None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “pyjoulescope_driver/binding.pyx”, line 859, in pyjoulescope_driver.binding.Driver.open
File “pyjoulescope_driver/binding.pyx”, line 593, in pyjoulescope_driver.binding._handle_rc
TimeoutError: jsdrv_open timed out | u/js220/000531

Hi @saiteja - Sorry to hear that you are having issues with your Python script. In general, opening and closing Joulescopes works well, but you can definitely run into issues, especially if you have multiple Python threads. In general, I recommend opening each Joulescope once and leaving it open until the end of the Python script.

A few questions:

  1. Is your script opening and closing Joulescopes repeatedly?
  2. If so, can you restructure your script to keep the Joulescopes open?
  3. If you still have issues, can you share your Joulescope parts of your script so that I can attempt to duplicate the issue?

Hi Team ,

This is the below error I am getting after executing the code :
utput read timeout reached.
ll_await timed out
ll_await_pubsub_topic(c/!pong) timed out
ping_wait(1) timed out
Traceback (most recent call last):
File “C:\Users\Test\PycharmProjects\Joulescope Automation\Power Automation\hibernate_master.py”, line 286, in
run()
File “C:\Users\Test\PycharmProjects\Joulescope Automation\Power Automation\hibernate_master.py”, line 275, in run
command_execution()
File “C:\Users\Test\PycharmProjects\Joulescope Automation\Power Automation\hibernate_master.py”, line 231, in command_execution
joulescope_measurement(command)
File “C:\Users\Test\PycharmProjects\Joulescope Automation\Power Automation\hibernate_master.py”, line 136, in joulescope_measurement
vcc_device.open(timeout=15)
File “C:\Users\Test\PycharmProjects\pythonProject1.venv\Lib\site-packages\joulescope\v1\device.py”, line 369, in open
rc = self._driver.open(self._path, mode, timeout=None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “pyjoulescope_driver/binding.pyx”, line 859, in pyjoulescope_driver.binding.Driver.open
File “pyjoulescope_driver/binding.pyx”, line 593, in pyjoulescope_driver.binding._handle_rc
TimeoutError: jsdrv_open timed out | u/js220/000531

If you require the code for more analysis ,please revert back

Since I have attached the error that I am facing ,will be attaching the sample code

import serial
import signal
import time
import queue
import pandas as pd
from datetime import datetime
from joulescope import scan
import threading
import sys
print(f"Running with python:{sys.executable}")


ser = None  # Serial connection object
vcc_current = 0  # Global variable to track the highest VCC current
vccq_current = 0  # Global variable to track the highest VCCQ current
current_measurements = []  # List to store highest VCC and VCCQ currents after each command

# Function to initialize the serial connection
def initialize_serial_connection():
    global ser
    try:
        ser = serial.Serial('COM16', 115200, timeout=None)
        if ser.is_open:
            print(f"Connected to {ser.name}")
        else:
            print("Serial port is not open, trying to open it now.")
            ser.open()
            print(f"Successfully opened {ser.name}")
    except serial.SerialException as e:
        print(f"Error opening serial port: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")


# Function to send a command through the serial connection
def send_command(command):
    try:
        if ser and ser.is_open:
            ser.write((command + '\n').encode())
        else:
            print("Serial port is not open. Unable to send command.")
    except serial.SerialException as e:
        print(f"Error writing to serial port: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")


# Function to read the output with an optional timeout
def read_output(timeout=None):
    start_time = time.time()
    output = ""
    try:
        while True:
            if ser.in_waiting > 0:  # Check if there's data in the buffer
                part = ser.read(ser.in_waiting).decode('utf-8')
                if part:
                    output += part
                    print(part, end='')  # Print output in real-time
            if timeout and (time.time() - start_time > timeout):
                print("\nOutput read timeout reached.")
                break
            time.sleep(1)
    except serial.SerialException as e:
        print(f"Error reading from serial port: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    return output


# Function to close the serial connection
def close_serial_connection():
    if ser and ser.is_open:
        ser.close()
        print("Serial connection closed.")


# Function to execute a command and return its output
def execute_command(command, runtime=None):
    print(f"\nExecuting '{command}'...")
    send_command(command)
    return read_output(timeout=runtime)


# Joulescope statistics callback for VCC
def vcc_callback(stats):
    global vcc_current
    i = stats['signals']['current']['µ']
    current_value = (i['value']) * 1000
    if current_value > vcc_current:
        vcc_current = current_value


# Joulescope statistics callback for VCCQ
def vccq_callback(stats):
    global vccq_current
    i = stats['signals']['current']['µ']
    current_value = (i['value']) * 1000
    if current_value > vccq_current:
        vccq_current = current_value


# Function to save the highest current measurement to an Excel file
def save_to_excel(dataframe):
    now = datetime.now()
    filename = f"current_measurement_{now.strftime('%Y%m%d_%H%M%S')}.xlsx"
    dataframe.to_excel(filename, index=False)
    print(f"Data saved to {filename}")


# Function to handle Joulescope measurements after each command execution
def joulescope_measurement(command_name):
    global vcc_current, vccq_current
    vcc_current = 0  # Reset VCC current before measurement
    vccq_current = 0  # Reset VCCQ current before measurement
    _quit = False
    statistics_queue = queue.Queue()

    def stop_fn(*args, **kwargs):
        nonlocal _quit
        _quit = True

    signal.signal(signal.SIGINT, stop_fn)

    devices = scan(config='off')
    if len(devices) < 2:
        print("Error: Two Joulescope devices are required but only one detected.")
        return

    # Assign device 1 for VCC and device 2 for VCCQ
    vcc_device = devices[0]
    vccq_device = devices[1]

    try:
        # Register callback for VCC device
        vcc_device.statistics_callback_register(vcc_callback, 'sensor')
        vcc_device.open(timeout=15)
        vcc_device.parameter_set('i_range', 'auto')
        vcc_device.parameter_set('v_range', '15V')

        # Register callback for VCCQ device
        vccq_device.statistics_callback_register(vccq_callback, 'sensor')
        vccq_device.open(timeout=15)
        vccq_device.parameter_set('i_range', 'auto')
        vccq_device.parameter_set('v_range', '15V')

        # Main loop to process Joulescope data (run for 5-7 seconds)
        start_time = time.time()
        while time.time() - start_time < 7:  # Run measurement for 5-7 seconds
            vcc_device.status()
            vccq_device.status()
            time.sleep(2)
    except TimeoutError as e:
        print(f"Timeout error while accessing joulescope")
    except Exception as e:
        print(f"Unexpected error ")

    finally:
        # Close both devices
        try:

            vcc_device.close()
            vccq_device.close()
        except Exception as e:

            print(f"Error while closing devices")

        # Store the highest VCC and VCCQ currents after command execution
        current_measurements.append({
            'Command': command_name,
            'VCC Current (mA)': vcc_current,
            'VCCQ Current (mA)': vccq_current
        })


# Function to handle command execution and sequentially run different voltage sets
def command_execution():
    initialize_serial_connection()

    if ser and ser.is_open:
        print("Sending Ctrl+C to interrupt any ongoing process...")
        send_command('\x03')  # Sending Ctrl+C (ASCII code 3)
        time.sleep(1)

        print("Checking current directory with 'pwd' command...")
        current_dir = send_command('pwd')
        time.sleep(3)

        if current_dir != '/data':
            print("\nChanging directory to /data...")
            send_command('cd /data')
            time.sleep(1)
            send_command('pwd')
            time.sleep(1)



        output_ufs_tester = execute_command('./UfsTester ', runtime=3)
        if "Start Time:   - us" in output_ufs_tester:
            print("\033[91m * DRIVER AND TESTER ARE NOT MATCHING ** \033[0m")
            time.sleep(10)
        elif "Device is not presented" in output_ufs_tester:
            print("\033[91m * PLS CONNECT THE DEVICE PROPERLY ** \033[0m")
            time.sleep(10)
        else:
            #output_10020 = execute_command('./UfsTester TL_Reset?Power -i', runtime=6)
            output_10020 = execute_command('./UfsTester  TL_PMC?PV', runtime=6)
            if "UFS Spec 3.1" in output_10020:
                print("\nExecuting 'LVCC' (this may take up to 12 minutes)...")

            # First set of commands (2.5V/1.2V)
            commands_set_1 = [
                './ TL_SetPower?VCC=2500,VCCQ=1200 CPK_SleepHibern_CLKOFF?SLEEPT=20000',
                './UfsTester  TL_SetPower?VCC=2500,VCCQ=1200 CPK_SleepHibern_CLKON?SLEEPT=20000000',
                './UfsTester  TL_SetPower?VCC=2500,VCCQ=1200 CPK_APS?SLEEPT=20000',
                './UfsTester  TL_SetPower?VCC=2500,VCCQ=1200 CPK_DeepSleepHibern_HWReset?SLEEPT=60000'
            ]

            # Second set of commands (2.4V/1.14V)
            commands_set_2 = [
              './UfsTester  TL_SetPower?VCC=2400,VCCQ=1140 CPK_SleepHibern_CLKOFF?SLEEPT=20000',
                './UfsTester  TL_SetPower?VCC=2400,VCCQ=1140 CPK_SleepHibern_CLKON?SLEEPT=20000000',
                './UfsTester  TL_SetPower?VCC=2400,VCCQ=1140 CPK_APS?SLEEPT=20000',
                './UfsTester  TL_SetPower?VCC=2400,VCCQ=1140 CPK_DeepSleepHibern_HWReset?SLEEPT=60000'
            ]

            # Third set of commands (2.7V/1.26V)
            commands_set_3 = [
                './UfsTester  TL_SetPower?VCC=2700,VCCQ=1260 CPK_SleepHibern_CLKOFF?SLEEPT=20000',
                './UfsTester  TL_SetPower?VCC=2700,VCCQ=1260 CPK_SleepHibern_CLKON?SLEEPT=20000000',
                './UfsTester  TL_SetPower?VCC=2700,VCCQ=1260 CPK_APS?SLEEPT=20000',
                './UfsTester  TL_SetPower?VCC=2700,VCCQ=1260 CPK_DeepSleepHibern_HWReset?SLEEPT=60000'
            ]

            # Execute the first set of commands (2.5V/1.2V)
            print("\nExecuting commands for 2.5V/1.2V...")
            for command in commands_set_1:
                try:

                    output = execute_command(command, runtime=12)
                    joulescope_measurement(command)
                    send_command('\x03')  # Send Ctrl+C to terminate after each command
                except Exception as e:
                    print(f"Error during command execution")

            # Reset the system and prepare for the next set
            execute_command('./UfsTester  TL_Reset?Power -i', runtime=6)
            #execute_command('./UfsTester TL_PMC?PV', runtime=6)

            # Execute the second set of commands (2.4V/1.14V)
            print("\nExecuting commands for 2.4V/1.14V...")
            for command in commands_set_2:
                try:

                    output = execute_command(command, runtime=12)
                    joulescope_measurement(command)
                    send_command('\x03')  # Send Ctrl+C to terminate after each command
                except Exception as e:
                    print(f"Error during command eecution")

            # Reset the system and prepare for the next set
            execute_command('./UfsTester  TL_Reset?Power -i', runtime=6)
            #execute_command('./UfsTester TL_PMC?PV', runtime=6)

            # Execute the third set of commands (2.7V/1.26V)
            print("\nExecuting commands for 2.7V/1.26V...")
            for command in commands_set_3:
                try:

                    output = execute_command(command, runtime=12)
                    joulescope_measurement(command)
                    send_command('\x03')  # Send Ctrl+C to terminate after each command
                except Exception as e:
                    print(f"Error during command exec")


            # Reset the system after the final set
            execute_command('./UfsTester TL_Reset?Power -i', runtime=6)
            #execute_command('./UfsTester TL_PMC?PV', runtime=6)

    close_serial_connection()

def exit_script():
    print("execution stopped after 8 seconds automaticaluu")
    sys.exit()


# Main function to run the entire process sequentially
def run():
    try:

    # Start the timer to stop the program after 15 seconds

        timer = threading.Timer(8, exit_script)
        timer.start()

    # Step 1: Execute commands via the serial connection
        command_execution()

    # Step 2: Save all highest current measurements to Excel
        df = pd.DataFrame(current_measurements)
        save_to_excel(df)

    # If execution finishes before the 15 seconds timer, cancel the timer
        timer.cancel()
    except Exception as e :
        print(f"Unexpected error during the run")
        df=pd.DataFrame(current_measurements)
        save_to_excel(df)


if __name__ == '__main__':
    run()

Please find the attached code ,please provide me the changes that I can make to run the code effectively without any error

Hi @saiteja - That’s a lot of Python code that did not post correctly indented. Can you try again to post the runnable Python file? Instead of replying to the notification email, visit the forum and post again with the Python file. Thanks!

Hi @saiteja - The code you posted was correctly formatted as raw text, just not as shown in the post. I editted your post and extracted the code.

For automated testing, I am a big fan of sending as little data as possible over USB. The joulescope package always sends sample data over USB. For statistics only, you can reduce the USB traffic to further improve reliability by using the pyjoulescope_driver package.

I am also not a fan of scanning for Joulescopes, opening and closing with each iteration. Just open, perform measurements as needed, and close when done.

I have created a new runnable script that demonstrates how to use your Joulescopes this way:

# Copyright 2025 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from pyjoulescope_driver import Driver
import time


class JoulescopeStatistics:
    """Use Joulescopes to gather statistics measurements over arbitrary durations.
    
    :param driver: The `pyjoulescope_driver.Driver` instance.
    :param device_map: The mapping between measurement quantity and device path.
        None (default) opens all connected devices and uses the device path as the quantity.
    """
    def __init__(self, driver, device_map=None):
        self._data = None
        self._driver = driver
        devices = sorted(self._driver.device_paths())
        if device_map is None:
            device_map = [(device, device) for device in devices]
        self._device_map = dict(device_map)
        self._device_map_rev = dict([(value, key) for key, value in self._device_map.items()])
        expected_devices = sorted(self._device_map.values())
        if devices != expected_devices:
            raise RuntimeError(f'Joulescopes not found.  Expected {expected_devices}, Found {devices}')
        for device in devices:
            self._driver.open(device)
            self._driver.publish(device + '/s/i/range/mode', 'auto', timeout=0)
            self._driver.publish(device + '/s/v/range/mode', '15V', timeout=0)
            self._driver.publish(device + '/s/stats/ctrl', 1, timeout=0)
            self._driver.subscribe(device + '/s/stats/value', 'pub', self._on_statistics_value, timeout=0)
            
    def _on_statistics_value(self, topic, value):
        if self._data is None:
            return
        device = '/'.join(topic.split('/')[:3])
        key = self._device_map_rev[device]
        self._data[key].append(value)
        
    def close(self):
        """Close all devices opened by this instance."""
        for device in self._device_map_rev.keys():
            self._driver.unsubscribe(device + '/s/stats/value', self._on_statistics_value, timeout=0)
            self._driver.close(device)

    def start(self):
        """Start a measurement."""
        self._data = {}
        for key in self._device_map.keys():
            self._data[key] = []
        
    def stop(self):
        """Stop a measurement.
        
        :return: The map of quantity to list of measure statistics dicts.
        """
        data, self._data = self._data, None
        return data



current_measurements = []  # List to store highest VCC and VCCQ currents after each command


def joulescope_measurement(js_stats, command_name, duration=None):
    """Perform a Joulescope measurement for our system.
    
    :param js_stats: The `JoulescopeStatistics` instance
    :param command_name: The command name for this measurement.
    :param duration: The measurement duration in seconds.
    :return: The measurement.  Also append to global `current_measurements` list.
    """
    duration = 2 if duration is None else float(duration)
   
    # perform the measurement
    js_stats.start()
    time.sleep(duration)
    measurement = js_stats.stop()

    # transform list of measured statistics data structures to our result data structure
    result = {
        'Command': command_name
    }
    for quantity, data in measurement.items():
        result[quantity] = 1000 * max([m['signals']['current']['avg']['value'] for m in data])
        
    # append to global list and return result
    current_measurements.append(result)
    return result


def run():
    js_device_map = {
        'VCC Current (mA)': 'u/js220/000027',   # replace with your serial number
        'VCCQ Current (mA)': 'u/js220/000043',  # replace with your serial number
    }
    with Driver() as jsdrv:
        js_stats = JoulescopeStatistics(jsdrv, js_device_map)
        try:
            # replace with your test code, call joulescope_measurement as needed
            while True:
                m = joulescope_measurement(js_stats, 'my command')
                print(m)
        except KeyboardInterrupt:
            pass
        finally:
            js_stats.close()


if __name__ == '__main__':
    run()

Note that the JoulescopeStatistics class can support any number of connected Joulescopes.

Does this make sense and address your issue?