I use Python to run 10 minutes power recordings with Joulescope on Windows 10 computer, however I get a lot of NaNs in the recorded data. The script is based on the StreamProcess, and I run it for 10 minutes with sampling frequency 1 kHz and run it for 10 minutes. I only get the power data and save them in a numpyzip. Do you have any idea on how to avoid packet loss, or loss of data, while recording data?
Couldn’t find a better way to share the code, but I run it as this:
class StreamProcess:
def __init__(self,field,duration,sampling_frequency):
self.fs=sampling_frequency
self.f=field
samples=int(duration*sampling_frequency)
self.chunk_size = 10000
length = ((samples + self.chunk_size - 1) // self.chunk_size) * self.chunk_size
self._buffer = np.empty((length, 1), dtype=np.float32)
self._buffer[:] = 0.0
self.idx = 0
def __str__(self):
return 'StreamProcess'
@property
def data(self):
return self._buffer[:self.idx,:]
def stream_notify(self, stream_buffer):
start_id, end_id = stream_buffer.sample_id_range
idx_next = self.idx + self.chunk_size
while idx_next <= end_id and idx_next <= len(self._buffer):
# Get and store power measurement
data = stream_buffer.samples_get(self.idx, idx_next, fields=[self.f])
self._buffer[self.idx:idx_next, 0] = data['signals'][self.f]['value']
self.idx = idx_next
idx_next += self.chunk_size
def export_data(self, outfolder, filename):
if not os.path.isdir(outfolder):
os.makedirs(outfolder)
fpath=outfolder+'/'+filename+'.npz'
print("Saving capture to file")
self.time = np.arange(0,len(self.data)/self.fs,1./self.fs)
data_type=self.f
np.savez(fpath,**{data_type: self.data})
print("Capture saved")
def plot_data(self):
print("---- Plotting captured data ----")
print("Close plot to continue")
data=self.data
plt.plot(self.time,data)
plt.xlabel('Time (s)')
plt.ylabel(self.f)
plt.title("Captured raw data")
plt.show()
def capture(case,config):
_quit = False
def on_stop(*args, **kwargs):
nonlocal _quit
_quit = True
duration =config['duration']
f_s = config['sampling_frequency']
field=config['measurement']
try:
device = joulescope.scan_require_one(config='auto')
print("JouleScope connected")
except RuntimeError:
print("Connect a Joulescope and restart")
sys.exit(1)
device.parameter_set('buffer_duration',2)
stream_process= StreamProcess(field,duration,f_s)
signal.signal(signal.SIGINT, on_stop)
print("Start wait of:",config['wait'])
time.sleep(config['wait'])
with device:
device.stream_process_register(stream_process)
device.start(stop_fn=on_stop, duration=duration)
print("Capture started")
while not _quit:
time.sleep(0.1)
device.stop() # for CTRL-C handling (safe for duplicate calls)
print("Capture finished")
stream_process.export_data(config['resultfolder'],case)
stream_process.plot_data()
Best regards
Anders