aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Scholten2025-09-11 17:09:27 +1000
committerSam Scholten2025-09-11 17:16:46 +1000
commit20faf71815922e6e79ef81fbee4e3c28c75fd891 (patch)
tree5c1af585bcd4e7b4b2a3312d1df78ea0e503dcc8
parentd5f84dbcc6822382b84e30edcdb3d28ac9326870 (diff)
downloadpicostream-20faf71815922e6e79ef81fbee4e3c28c75fd891.tar.gz
picostream-20faf71815922e6e79ef81fbee4e3c28c75fd891.zip
add live-only mode with configurable buffer size limit
-rw-r--r--README.md35
-rw-r--r--picostream/__init__.py7
-rw-r--r--picostream/consumer.py301
-rw-r--r--picostream/dfplot.py33
-rw-r--r--picostream/main.py68
5 files changed, 279 insertions, 165 deletions
diff --git a/README.md b/README.md
index 2bcca2e..d21e84b 100644
--- a/README.md
+++ b/README.md
@@ -7,6 +7,7 @@ PicoStream is a high-performance Python application for streaming data from a Pi
- **Robust Architecture**: Separates data acquisition from disk I/O using a producer-consumer pattern with a large, shared memory buffer pool to prevent data loss.
- **Zero-Risk Live Plotting**: The plotter reads from the HDF5 file, not the live data stream. This ensures that a slow or crashing GUI cannot interfere with data acquisition.
- **Efficient Visualization**: Uses `pyqtgraph` and a Numba-accelerated min-max decimation algorithm to display large datasets with minimal CPU impact.
+- **Live-Only Mode**: Optional buffer size limiting for pure real-time monitoring without generating large files.
- **Flexible Data Reading**: The included `PicoStreamReader` class allows for easy post-processing, including on-the-fly decimation ('mean' or 'min_max').
## Installation
@@ -21,14 +22,34 @@ The primary way to use the package is through the `picostream` command-line tool
### Acquisition
-The following command starts an acquisition at 62.5 MS/s and saves the data to `my_data.hdf5` with a live plot.
+The following commands show different acquisition modes:
```bash
+# Standard acquisition (saves all data)
picostream -s 62.5 -o my_data.hdf5 --plot
+
+# Live-only mode (limited buffer)
+picostream -s 62.5 --plot --max-buff-sec 60
```
Run `picostream --help` for a full list of options.
+### Live-Only Mode
+
+For pure real-time monitoring without saving large amounts of data, use the `--max-buff-sec` option:
+
+```bash
+picostream -s 62.5 --plot --max-buff-sec 60
+```
+
+This limits the HDF5 file to the last 60 seconds of data, automatically overwriting older data. Perfect for continuous monitoring applications where you only need recent data visible.
+
+**Benefits:**
+- Prevents disk space issues during long acquisitions
+- Maintains full plotting functionality
+- File size stays constant after initial buffer fill
+- No performance impact on acquisition
+
### Viewing an Existing File
The plotter can be run as a standalone tool to view any compatible HDF5 file.
@@ -109,6 +130,18 @@ Retrieves a specific block of data from the file.
Resets the internal counter for `get_next_block()` to the beginning of the file.
+## Changelog
+
+### Version 0.2.0
+- **New Feature**: Added live-only mode with `--max-buff-sec` option for buffer size limiting
+- **Enhancement**: Improved plotter to handle buffer resets gracefully
+- **Enhancement**: Added total sample count tracking across buffer resets
+- **Enhancement**: Skip verification step in live-only mode for better performance
+- **Documentation**: Updated README with live-only mode usage examples
+
+### Version 0.1.0
+- Initial release with core streaming and plotting functionality
+
## Acknowledgements
This package began as a fork of [JoshHarris2108/pico_streaming](https://github.com/JoshHarris2108/pico_streaming) (which is unlicensed). I acknowledge and appreciate Josh's original idea/architecture.
diff --git a/picostream/__init__.py b/picostream/__init__.py
index e69de29..1fb1a8e 100644
--- a/picostream/__init__.py
+++ b/picostream/__init__.py
@@ -0,0 +1,7 @@
+"""PicoStream: High-performance PicoScope data acquisition and visualization."""
+
+from .reader import PicoStreamReader
+from .dfplot import HDF5LivePlotter
+
+__version__ = "0.2.0"
+__all__ = ["PicoStreamReader", "HDF5LivePlotter"]
diff --git a/picostream/consumer.py b/picostream/consumer.py
index b1b2a5f..f48ff30 100644
--- a/picostream/consumer.py
+++ b/picostream/consumer.py
@@ -1,141 +1,160 @@
-from __future__ import annotations
-
-import os
-import queue
-import threading
-from typing import Any, Dict, List
-
-import h5py
-import numpy as np
-from loguru import logger
-
-
-class Consumer:
- """A data consumer that runs in a separate thread.
-
- This class retrieves data buffers from a queue, writes them to an HDF5 file,
- and then returns the buffer index to an "empty" queue for reuse by the
- producer. It handles file creation, data writing, and metadata storage.
- """
-
- def __init__(
- self,
- buffer_size: int,
- data_queue: queue.Queue[int],
- empty_queue: queue.Queue[int],
- data_buffers: List[np.ndarray],
- file_name: str,
- shutdown_event: threading.Event,
- metadata: Dict[str, Any],
- ):
- """Initializes the Consumer.
-
- Args:
- buffer_size: The size of each individual data buffer.
- data_queue: A queue for receiving indices of data-filled buffers.
- empty_queue: A queue for returning indices of processed (empty) buffers.
- data_buffers: A list of pre-allocated NumPy arrays for data.
- file_name: The path to the output HDF5 file.
- shutdown_event: A threading.Event to signal termination.
- metadata: A dictionary of metadata to be saved as HDF5 attributes.
- """
- self.buffer_size = buffer_size
- self.data_queue = data_queue
- self.empty_queue = empty_queue
- self.data_buffers = data_buffers
- self.file_name = file_name
- self.shutdown_event = shutdown_event
- self.metadata = metadata
-
- self.values_written: int = 0
- self.empty_con_queue_count: int = 0
-
- def format_sample_count(self, count: int) -> str:
- """Format a large integer count into a human-readable string.
-
- Uses metric prefixes (K, M, G) for thousands, millions, and billions.
-
- Args:
- count: The integer number to format.
-
- Returns:
- A formatted string representation of the count.
- """
- if count >= 1_000_000_000:
- return f"{count / 1_000_000_000:.2f}G"
- if count >= 1_000_000:
- return f"{count / 1_000_000:.2f}M"
- if count >= 1_000:
- return f"{count / 1_000:.2f}K"
- else:
- return str(count)
-
- def _processing_loop(self, dset: h5py.Dataset) -> None:
- """
- Continuously processes data from the queue and writes to the HDF5 dataset.
-
- Args:
- dset: The HDF5 dataset to write to.
- """
- while not self.shutdown_event.is_set():
- try:
- # Wait for a buffer index from the producer.
- # A timeout allows the loop to periodically check the shutdown event.
- idx = self.data_queue.get(timeout=0.1)
-
- # Append the new data to the HDF5 dataset.
- buffer_len = len(self.data_buffers[idx])
- dset.resize((self.values_written + buffer_len,))
- dset[self.values_written :] = self.data_buffers[idx]
-
- # Return the buffer index to the empty queue for reuse.
- self.empty_queue.put(idx)
-
- self.values_written += buffer_len
-
- except queue.Empty:
- # This occurs if the producer hasn't provided data within the timeout.
- self.empty_con_queue_count += 1
- # This is expected when acquisition stops, so no need to log as a warning
- if not self.shutdown_event.is_set():
- logger.debug("Consumer queue was empty.")
-
- def consume(self) -> None:
- """The main loop for the consumer thread.
-
- This method continuously checks for data from the producer, writes it to
- the HDF5 file, and returns the buffer for reuse. It handles file setup,
- the main processing loop, and graceful shutdown.
- """
- try:
- # Ensure a clean slate by removing any pre-existing file.
- if os.path.exists(self.file_name):
- os.remove(self.file_name)
- logger.info(f"Removed existing file: {self.file_name}")
-
- with h5py.File(self.file_name, "w") as f:
- # Write the collected metadata to the HDF5 file's attributes.
- for key, value in self.metadata.items():
- if value is not None:
- f.attrs[key] = value
-
- # Create a resizable dataset for the ADC data.
- # Chunking is aligned with the buffer size for efficient writes.
- dset = f.create_dataset(
- "adc_counts",
- (0,),
- maxshape=(None,),
- dtype="int16",
- chunks=(self.buffer_size,),
- )
-
- self._processing_loop(dset)
- except (IOError, OSError) as e:
- # A critical file error means we cannot continue.
- logger.critical(f"Failed to create or write to HDF5 file: {e}")
- self.shutdown_event.set() # Signal other threads to shut down.
- return
-
- logger.info(
- f"Consumer couldn't obtain data from queue {self.empty_con_queue_count} times."
- )
+from __future__ import annotations
+
+import os
+import queue
+import threading
+from typing import Any, Dict, List, Optional
+
+import h5py
+import numpy as np
+from loguru import logger
+
+
+class Consumer:
+ """A data consumer that runs in a separate thread.
+
+ This class retrieves data buffers from a queue, writes them to an HDF5 file,
+ and then returns the buffer index to an "empty" queue for reuse by the
+ producer. It handles file creation, data writing, and metadata storage.
+
+ Supports live-only mode when max_samples is specified, which limits the
+ HDF5 file size by resetting and overwriting data when the limit is reached.
+ """
+
+ def __init__(
+ self,
+ buffer_size: int,
+ data_queue: queue.Queue[int],
+ empty_queue: queue.Queue[int],
+ data_buffers: List[np.ndarray],
+ file_name: str,
+ shutdown_event: threading.Event,
+ metadata: Dict[str, Any],
+ max_samples: Optional[int] = None,
+ ):
+ """Initializes the Consumer.
+
+ Args:
+ buffer_size: The size of each individual data buffer.
+ data_queue: A queue for receiving indices of data-filled buffers.
+ empty_queue: A queue for returning indices of processed (empty) buffers.
+ data_buffers: A list of pre-allocated NumPy arrays for data.
+ file_name: The path to the output HDF5 file.
+ shutdown_event: A threading.Event to signal termination.
+ metadata: A dictionary of metadata to be saved as HDF5 attributes.
+ max_samples: Maximum number of samples to keep in file (for live-only mode).
+ """
+ self.buffer_size = buffer_size
+ self.data_queue = data_queue
+ self.empty_queue = empty_queue
+ self.data_buffers = data_buffers
+ self.file_name = file_name
+ self.shutdown_event = shutdown_event
+ self.metadata = metadata
+ self.max_samples = max_samples
+
+ self.values_written: int = 0
+ self.empty_con_queue_count: int = 0
+ self.buffer_resets: int = 0
+
+ def format_sample_count(self, count: int) -> str:
+ """Format a large integer count into a human-readable string.
+
+ Uses metric prefixes (K, M, G) for thousands, millions, and billions.
+
+ Args:
+ count: The integer number to format.
+
+ Returns:
+ A formatted string representation of the count.
+ """
+ if count >= 1_000_000_000:
+ return f"{count / 1_000_000_000:.2f}G"
+ if count >= 1_000_000:
+ return f"{count / 1_000_000:.2f}M"
+ if count >= 1_000:
+ return f"{count / 1_000:.2f}K"
+ else:
+ return str(count)
+
+ def _processing_loop(self, dset: h5py.Dataset) -> None:
+ """
+ Continuously processes data from the queue and writes to the HDF5 dataset.
+
+ Args:
+ dset: The HDF5 dataset to write to.
+ """
+ while not self.shutdown_event.is_set():
+ try:
+ # Wait for a buffer index from the producer.
+ # A timeout allows the loop to periodically check the shutdown event.
+ idx = self.data_queue.get(timeout=0.1)
+
+ # Append the new data to the HDF5 dataset.
+ buffer_len = len(self.data_buffers[idx])
+
+ # Check if we need to reset the buffer (live-only mode)
+ if self.max_samples and self.values_written + buffer_len > self.max_samples:
+ # Reset the dataset to start overwriting from the beginning
+ dset.resize((buffer_len,))
+ dset[:] = self.data_buffers[idx]
+ self.values_written = buffer_len
+ self.buffer_resets += 1
+ logger.debug(f"Buffer reset #{self.buffer_resets} - file size limited to {self.max_samples:,} samples")
+ else:
+ # Normal append
+ dset.resize((self.values_written + buffer_len,))
+ dset[self.values_written :] = self.data_buffers[idx]
+ self.values_written += buffer_len
+
+ # Return the buffer index to the empty queue for reuse.
+ self.empty_queue.put(idx)
+
+ except queue.Empty:
+ # This occurs if the producer hasn't provided data within the timeout.
+ self.empty_con_queue_count += 1
+ # This is expected when acquisition stops, so no need to log as a warning
+ if not self.shutdown_event.is_set():
+ logger.debug("Consumer queue was empty.")
+
+ def consume(self) -> None:
+ """The main loop for the consumer thread.
+
+ This method continuously checks for data from the producer, writes it to
+ the HDF5 file, and returns the buffer for reuse. It handles file setup,
+ the main processing loop, and graceful shutdown.
+ """
+ try:
+ # Ensure a clean slate by removing any pre-existing file.
+ if os.path.exists(self.file_name):
+ os.remove(self.file_name)
+ logger.info(f"Removed existing file: {self.file_name}")
+
+ with h5py.File(self.file_name, "w") as f:
+ # Write the collected metadata to the HDF5 file's attributes.
+ for key, value in self.metadata.items():
+ if value is not None:
+ f.attrs[key] = value
+
+ # Create a resizable dataset for the ADC data.
+ # Chunking is aligned with the buffer size for efficient writes.
+ dset = f.create_dataset(
+ "adc_counts",
+ (0,),
+ maxshape=(None,),
+ dtype="int16",
+ chunks=(self.buffer_size,),
+ )
+
+ self._processing_loop(dset)
+ except (IOError, OSError) as e:
+ # A critical file error means we cannot continue.
+ logger.critical(f"Failed to create or write to HDF5 file: {e}")
+ self.shutdown_event.set() # Signal other threads to shut down.
+ return
+
+ logger.info(
+ f"Consumer couldn't obtain data from queue {self.empty_con_queue_count} times."
+ )
+ if self.buffer_resets > 0:
+ logger.info(f"Buffer was reset {self.buffer_resets} times (live-only mode).")
diff --git a/picostream/dfplot.py b/picostream/dfplot.py
index cac3a38..aaf1ca7 100644
--- a/picostream/dfplot.py
+++ b/picostream/dfplot.py
@@ -28,6 +28,9 @@ class HDF5LivePlotter(QMainWindow):
"""
Real-time oscilloscope-style plotter that reads from HDF5 files.
Completely independent of acquisition system for zero-risk operation.
+
+ Supports both standard acquisition files and live-only mode files with
+ automatic buffer reset detection and graceful handling of file size changes.
"""
def __init__(
@@ -66,6 +69,11 @@ class HDF5LivePlotter(QMainWindow):
self.time_data: np.ndarray = np.array([])
self.data_start_sample: int = 0
+ # --- Buffer Reset Detection ---
+ self.last_file_size: int = 0
+ self.buffer_reset_count: int = 0
+ self.total_samples_processed: int = 0
+
# --- HDF5 Metadata ---
self.sample_interval_ns: float = 16.0 # Default, will be read from file
self.hardware_downsample_ratio: int = 1
@@ -262,6 +270,16 @@ class HDF5LivePlotter(QMainWindow):
# Update the display with this complete window (only when data changes)
self.update_display(data_window)
+ def _handle_buffer_reset(self, new_size: int) -> None:
+ """Handle detection of a buffer reset (file size decreased)."""
+ self.buffer_reset_count += 1
+ self.total_samples_processed += self.last_file_size
+ logger.debug(f"Buffer reset detected: size {self.last_file_size:,} → {new_size:,} (reset #{self.buffer_reset_count})")
+
+ # Reset rate calculation to avoid confusion
+ self.rate_check_start_time = None
+ self.rate_check_start_samples = 0
+
def _handle_stale_data(self) -> None:
"""Handle a file check where no new data is found."""
self.stale_update_count += 1
@@ -276,9 +294,14 @@ class HDF5LivePlotter(QMainWindow):
def _update_status_labels(self, current_size: int) -> None:
"""Update the various status labels in the UI."""
- # Update samples label
+ # Update samples label (include total if buffer resets occurred)
samples_text = self.format_sample_count(current_size)
- self.samples_label.setText(f"Samples: {samples_text}")
+ if self.buffer_reset_count > 0:
+ total_processed = self.total_samples_processed + current_size
+ total_text = self.format_sample_count(total_processed)
+ self.samples_label.setText(f"Samples: {samples_text} (total: {total_text})")
+ else:
+ self.samples_label.setText(f"Samples: {samples_text}")
# Color-code latency
latency_color = (
@@ -371,11 +394,17 @@ class HDF5LivePlotter(QMainWindow):
start_index = max(0, current_size - display_window_points)
self.data_start_sample = start_index
+ # Detect buffer resets (file size decreased)
+ if current_size < self.last_file_size:
+ self._handle_buffer_reset(current_size)
+
# Track data freshness and changes
if current_size > self.last_displayed_size:
self._handle_new_data(dataset, start_index, current_size)
else:
self._handle_stale_data()
+
+ self.last_file_size = current_size
self._update_status_labels(current_size)
self._update_rate_label(current_size)
diff --git a/picostream/main.py b/picostream/main.py
index 44a0c0c..8fc7f78 100644
--- a/picostream/main.py
+++ b/picostream/main.py
@@ -26,6 +26,9 @@ class Streamer:
This class initializes the Picoscope device (producer), the HDF5 writer
(consumer), and the live plotter. It manages the threads, queues, and
graceful shutdown of the entire application.
+
+ Supports both standard acquisition mode (saves all data) and live-only mode
+ (limits buffer size using max_buffer_seconds parameter).
"""
def __init__(
@@ -41,12 +44,14 @@ class Streamer:
hardware_downsample: int = 1,
downsample_mode: str = "average",
offset_v: float = 0.0,
+ max_buffer_seconds: Optional[float] = None,
) -> None:
# --- Configuration ---
self.output_file = output_file
self.debug = debug
self.enable_live_plot = enable_live_plot
self.plot_window_s = plot_window_s
+ self.max_buffer_seconds = max_buffer_seconds
(
sample_rate_msps,
@@ -167,6 +172,15 @@ class Streamer:
# --- Consumer ---
# Get metadata from configured device and pass to consumer
metadata = self.pico_device.get_metadata()
+
+ # Calculate max samples for live-only mode
+ max_samples = None
+ if self.max_buffer_seconds:
+ max_samples = int(effective_rate_sps * self.max_buffer_seconds)
+ if downsample_mode == "aggregate":
+ max_samples *= 2
+ logger.info(f"Live-only mode: limiting buffer to {self.max_buffer_seconds}s ({max_samples:,} samples)")
+
self.consumer: Consumer = Consumer(
self.consumer_buffer_size,
data_queue,
@@ -175,6 +189,7 @@ class Streamer:
output_file,
self.shutdown_event,
metadata=metadata,
+ max_samples=max_samples,
)
# --- Threads ---
@@ -463,6 +478,11 @@ VOLTAGE_RANGE_MAP = {
default=0.0,
help="Analog offset in Volts (only for ranges < 5V). [default: 0.0]",
)
+@click.option(
+ "--max-buff-sec",
+ type=float,
+ help="Maximum buffer duration in seconds for live-only mode (limits file size).",
+)
def main(
sample_rate: float,
resolution: str,
@@ -475,6 +495,7 @@ def main(
hardware_downsample: int,
downsample_mode: str,
offset: float,
+ max_buff_sec: Optional[float],
) -> None:
"""High-speed data acquisition tool for Picoscope 5000a series."""
# --- Argument Validation and Processing ---
@@ -523,6 +544,7 @@ def main(
hardware_downsample=hardware_downsample,
downsample_mode=downsample_mode,
offset_v=offset,
+ max_buffer_seconds=max_buff_sec,
)
streamer.run(app)
except RuntimeError as e:
@@ -535,27 +557,31 @@ def main(
sys.exit(1)
# --- Verification Step ---
- logger.info(f"Verifying output file: {output}")
- try:
- expected_samples = streamer.consumer.values_written
- if expected_samples == 0:
- logger.warning("Consumer processed no samples. Nothing to verify.")
- else:
- with h5py.File(output, "r") as f:
- if "adc_counts" not in f:
- raise ValueError("Dataset 'adc_counts' not found in HDF5 file.")
-
- actual_samples = len(f["adc_counts"])
- if actual_samples == expected_samples:
- logger.success(
- f"Verification PASSED: File contains {actual_samples} samples, as expected."
- )
- else:
- logger.error(
- f"Verification FAILED: Expected {expected_samples} samples, but file has {actual_samples}."
- )
- except Exception as e:
- logger.error(f"HDF5 file verification failed: {e}")
+ # Skip verification in live-only mode since file size is limited
+ if not streamer.max_buffer_seconds:
+ logger.info(f"Verifying output file: {output}")
+ try:
+ expected_samples = streamer.consumer.values_written
+ if expected_samples == 0:
+ logger.warning("Consumer processed no samples. Nothing to verify.")
+ else:
+ with h5py.File(output, "r") as f:
+ if "adc_counts" not in f:
+ raise ValueError("Dataset 'adc_counts' not found in HDF5 file.")
+
+ actual_samples = len(f["adc_counts"])
+ if actual_samples == expected_samples:
+ logger.success(
+ f"Verification PASSED: File contains {actual_samples} samples, as expected."
+ )
+ else:
+ logger.error(
+ f"Verification FAILED: Expected {expected_samples} samples, but file has {actual_samples}."
+ )
+ except Exception as e:
+ logger.error(f"HDF5 file verification failed: {e}")
+ else:
+ logger.info("Skipping verification in live-only mode.")
if __name__ == "__main__":