diff options
| author | Sam Scholten | 2025-09-11 17:09:27 +1000 |
|---|---|---|
| committer | Sam Scholten | 2025-09-11 17:16:46 +1000 |
| commit | 20faf71815922e6e79ef81fbee4e3c28c75fd891 (patch) | |
| tree | 5c1af585bcd4e7b4b2a3312d1df78ea0e503dcc8 | |
| parent | d5f84dbcc6822382b84e30edcdb3d28ac9326870 (diff) | |
| download | picostream-20faf71815922e6e79ef81fbee4e3c28c75fd891.tar.gz picostream-20faf71815922e6e79ef81fbee4e3c28c75fd891.zip | |
add live-only mode with configurable buffer size limit
| -rw-r--r-- | README.md | 35 | ||||
| -rw-r--r-- | picostream/__init__.py | 7 | ||||
| -rw-r--r-- | picostream/consumer.py | 301 | ||||
| -rw-r--r-- | picostream/dfplot.py | 33 | ||||
| -rw-r--r-- | picostream/main.py | 68 |
5 files changed, 279 insertions, 165 deletions
@@ -7,6 +7,7 @@ PicoStream is a high-performance Python application for streaming data from a Pi - **Robust Architecture**: Separates data acquisition from disk I/O using a producer-consumer pattern with a large, shared memory buffer pool to prevent data loss. - **Zero-Risk Live Plotting**: The plotter reads from the HDF5 file, not the live data stream. This ensures that a slow or crashing GUI cannot interfere with data acquisition. - **Efficient Visualization**: Uses `pyqtgraph` and a Numba-accelerated min-max decimation algorithm to display large datasets with minimal CPU impact. +- **Live-Only Mode**: Optional buffer size limiting for pure real-time monitoring without generating large files. - **Flexible Data Reading**: The included `PicoStreamReader` class allows for easy post-processing, including on-the-fly decimation ('mean' or 'min_max'). ## Installation @@ -21,14 +22,34 @@ The primary way to use the package is through the `picostream` command-line tool ### Acquisition -The following command starts an acquisition at 62.5 MS/s and saves the data to `my_data.hdf5` with a live plot. +The following commands show different acquisition modes: ```bash +# Standard acquisition (saves all data) picostream -s 62.5 -o my_data.hdf5 --plot + +# Live-only mode (limited buffer) +picostream -s 62.5 --plot --max-buff-sec 60 ``` Run `picostream --help` for a full list of options. +### Live-Only Mode + +For pure real-time monitoring without saving large amounts of data, use the `--max-buff-sec` option: + +```bash +picostream -s 62.5 --plot --max-buff-sec 60 +``` + +This limits the HDF5 file to the last 60 seconds of data, automatically overwriting older data. Perfect for continuous monitoring applications where you only need recent data visible. + +**Benefits:** +- Prevents disk space issues during long acquisitions +- Maintains full plotting functionality +- File size stays constant after initial buffer fill +- No performance impact on acquisition + ### Viewing an Existing File The plotter can be run as a standalone tool to view any compatible HDF5 file. @@ -109,6 +130,18 @@ Retrieves a specific block of data from the file. Resets the internal counter for `get_next_block()` to the beginning of the file. +## Changelog + +### Version 0.2.0 +- **New Feature**: Added live-only mode with `--max-buff-sec` option for buffer size limiting +- **Enhancement**: Improved plotter to handle buffer resets gracefully +- **Enhancement**: Added total sample count tracking across buffer resets +- **Enhancement**: Skip verification step in live-only mode for better performance +- **Documentation**: Updated README with live-only mode usage examples + +### Version 0.1.0 +- Initial release with core streaming and plotting functionality + ## Acknowledgements This package began as a fork of [JoshHarris2108/pico_streaming](https://github.com/JoshHarris2108/pico_streaming) (which is unlicensed). I acknowledge and appreciate Josh's original idea/architecture. diff --git a/picostream/__init__.py b/picostream/__init__.py index e69de29..1fb1a8e 100644 --- a/picostream/__init__.py +++ b/picostream/__init__.py @@ -0,0 +1,7 @@ +"""PicoStream: High-performance PicoScope data acquisition and visualization.""" + +from .reader import PicoStreamReader +from .dfplot import HDF5LivePlotter + +__version__ = "0.2.0" +__all__ = ["PicoStreamReader", "HDF5LivePlotter"] diff --git a/picostream/consumer.py b/picostream/consumer.py index b1b2a5f..f48ff30 100644 --- a/picostream/consumer.py +++ b/picostream/consumer.py @@ -1,141 +1,160 @@ -from __future__ import annotations
-
-import os
-import queue
-import threading
-from typing import Any, Dict, List
-
-import h5py
-import numpy as np
-from loguru import logger
-
-
-class Consumer:
- """A data consumer that runs in a separate thread.
-
- This class retrieves data buffers from a queue, writes them to an HDF5 file,
- and then returns the buffer index to an "empty" queue for reuse by the
- producer. It handles file creation, data writing, and metadata storage.
- """
-
- def __init__(
- self,
- buffer_size: int,
- data_queue: queue.Queue[int],
- empty_queue: queue.Queue[int],
- data_buffers: List[np.ndarray],
- file_name: str,
- shutdown_event: threading.Event,
- metadata: Dict[str, Any],
- ):
- """Initializes the Consumer.
-
- Args:
- buffer_size: The size of each individual data buffer.
- data_queue: A queue for receiving indices of data-filled buffers.
- empty_queue: A queue for returning indices of processed (empty) buffers.
- data_buffers: A list of pre-allocated NumPy arrays for data.
- file_name: The path to the output HDF5 file.
- shutdown_event: A threading.Event to signal termination.
- metadata: A dictionary of metadata to be saved as HDF5 attributes.
- """
- self.buffer_size = buffer_size
- self.data_queue = data_queue
- self.empty_queue = empty_queue
- self.data_buffers = data_buffers
- self.file_name = file_name
- self.shutdown_event = shutdown_event
- self.metadata = metadata
-
- self.values_written: int = 0
- self.empty_con_queue_count: int = 0
-
- def format_sample_count(self, count: int) -> str:
- """Format a large integer count into a human-readable string.
-
- Uses metric prefixes (K, M, G) for thousands, millions, and billions.
-
- Args:
- count: The integer number to format.
-
- Returns:
- A formatted string representation of the count.
- """
- if count >= 1_000_000_000:
- return f"{count / 1_000_000_000:.2f}G"
- if count >= 1_000_000:
- return f"{count / 1_000_000:.2f}M"
- if count >= 1_000:
- return f"{count / 1_000:.2f}K"
- else:
- return str(count)
-
- def _processing_loop(self, dset: h5py.Dataset) -> None:
- """
- Continuously processes data from the queue and writes to the HDF5 dataset.
-
- Args:
- dset: The HDF5 dataset to write to.
- """
- while not self.shutdown_event.is_set():
- try:
- # Wait for a buffer index from the producer.
- # A timeout allows the loop to periodically check the shutdown event.
- idx = self.data_queue.get(timeout=0.1)
-
- # Append the new data to the HDF5 dataset.
- buffer_len = len(self.data_buffers[idx])
- dset.resize((self.values_written + buffer_len,))
- dset[self.values_written :] = self.data_buffers[idx]
-
- # Return the buffer index to the empty queue for reuse.
- self.empty_queue.put(idx)
-
- self.values_written += buffer_len
-
- except queue.Empty:
- # This occurs if the producer hasn't provided data within the timeout.
- self.empty_con_queue_count += 1
- # This is expected when acquisition stops, so no need to log as a warning
- if not self.shutdown_event.is_set():
- logger.debug("Consumer queue was empty.")
-
- def consume(self) -> None:
- """The main loop for the consumer thread.
-
- This method continuously checks for data from the producer, writes it to
- the HDF5 file, and returns the buffer for reuse. It handles file setup,
- the main processing loop, and graceful shutdown.
- """
- try:
- # Ensure a clean slate by removing any pre-existing file.
- if os.path.exists(self.file_name):
- os.remove(self.file_name)
- logger.info(f"Removed existing file: {self.file_name}")
-
- with h5py.File(self.file_name, "w") as f:
- # Write the collected metadata to the HDF5 file's attributes.
- for key, value in self.metadata.items():
- if value is not None:
- f.attrs[key] = value
-
- # Create a resizable dataset for the ADC data.
- # Chunking is aligned with the buffer size for efficient writes.
- dset = f.create_dataset(
- "adc_counts",
- (0,),
- maxshape=(None,),
- dtype="int16",
- chunks=(self.buffer_size,),
- )
-
- self._processing_loop(dset)
- except (IOError, OSError) as e:
- # A critical file error means we cannot continue.
- logger.critical(f"Failed to create or write to HDF5 file: {e}")
- self.shutdown_event.set() # Signal other threads to shut down.
- return
-
- logger.info(
- f"Consumer couldn't obtain data from queue {self.empty_con_queue_count} times."
- )
+from __future__ import annotations + +import os +import queue +import threading +from typing import Any, Dict, List, Optional + +import h5py +import numpy as np +from loguru import logger + + +class Consumer: + """A data consumer that runs in a separate thread. + + This class retrieves data buffers from a queue, writes them to an HDF5 file, + and then returns the buffer index to an "empty" queue for reuse by the + producer. It handles file creation, data writing, and metadata storage. + + Supports live-only mode when max_samples is specified, which limits the + HDF5 file size by resetting and overwriting data when the limit is reached. + """ + + def __init__( + self, + buffer_size: int, + data_queue: queue.Queue[int], + empty_queue: queue.Queue[int], + data_buffers: List[np.ndarray], + file_name: str, + shutdown_event: threading.Event, + metadata: Dict[str, Any], + max_samples: Optional[int] = None, + ): + """Initializes the Consumer. + + Args: + buffer_size: The size of each individual data buffer. + data_queue: A queue for receiving indices of data-filled buffers. + empty_queue: A queue for returning indices of processed (empty) buffers. + data_buffers: A list of pre-allocated NumPy arrays for data. + file_name: The path to the output HDF5 file. + shutdown_event: A threading.Event to signal termination. + metadata: A dictionary of metadata to be saved as HDF5 attributes. + max_samples: Maximum number of samples to keep in file (for live-only mode). + """ + self.buffer_size = buffer_size + self.data_queue = data_queue + self.empty_queue = empty_queue + self.data_buffers = data_buffers + self.file_name = file_name + self.shutdown_event = shutdown_event + self.metadata = metadata + self.max_samples = max_samples + + self.values_written: int = 0 + self.empty_con_queue_count: int = 0 + self.buffer_resets: int = 0 + + def format_sample_count(self, count: int) -> str: + """Format a large integer count into a human-readable string. + + Uses metric prefixes (K, M, G) for thousands, millions, and billions. + + Args: + count: The integer number to format. + + Returns: + A formatted string representation of the count. + """ + if count >= 1_000_000_000: + return f"{count / 1_000_000_000:.2f}G" + if count >= 1_000_000: + return f"{count / 1_000_000:.2f}M" + if count >= 1_000: + return f"{count / 1_000:.2f}K" + else: + return str(count) + + def _processing_loop(self, dset: h5py.Dataset) -> None: + """ + Continuously processes data from the queue and writes to the HDF5 dataset. + + Args: + dset: The HDF5 dataset to write to. + """ + while not self.shutdown_event.is_set(): + try: + # Wait for a buffer index from the producer. + # A timeout allows the loop to periodically check the shutdown event. + idx = self.data_queue.get(timeout=0.1) + + # Append the new data to the HDF5 dataset. + buffer_len = len(self.data_buffers[idx]) + + # Check if we need to reset the buffer (live-only mode) + if self.max_samples and self.values_written + buffer_len > self.max_samples: + # Reset the dataset to start overwriting from the beginning + dset.resize((buffer_len,)) + dset[:] = self.data_buffers[idx] + self.values_written = buffer_len + self.buffer_resets += 1 + logger.debug(f"Buffer reset #{self.buffer_resets} - file size limited to {self.max_samples:,} samples") + else: + # Normal append + dset.resize((self.values_written + buffer_len,)) + dset[self.values_written :] = self.data_buffers[idx] + self.values_written += buffer_len + + # Return the buffer index to the empty queue for reuse. + self.empty_queue.put(idx) + + except queue.Empty: + # This occurs if the producer hasn't provided data within the timeout. + self.empty_con_queue_count += 1 + # This is expected when acquisition stops, so no need to log as a warning + if not self.shutdown_event.is_set(): + logger.debug("Consumer queue was empty.") + + def consume(self) -> None: + """The main loop for the consumer thread. + + This method continuously checks for data from the producer, writes it to + the HDF5 file, and returns the buffer for reuse. It handles file setup, + the main processing loop, and graceful shutdown. + """ + try: + # Ensure a clean slate by removing any pre-existing file. + if os.path.exists(self.file_name): + os.remove(self.file_name) + logger.info(f"Removed existing file: {self.file_name}") + + with h5py.File(self.file_name, "w") as f: + # Write the collected metadata to the HDF5 file's attributes. + for key, value in self.metadata.items(): + if value is not None: + f.attrs[key] = value + + # Create a resizable dataset for the ADC data. + # Chunking is aligned with the buffer size for efficient writes. + dset = f.create_dataset( + "adc_counts", + (0,), + maxshape=(None,), + dtype="int16", + chunks=(self.buffer_size,), + ) + + self._processing_loop(dset) + except (IOError, OSError) as e: + # A critical file error means we cannot continue. + logger.critical(f"Failed to create or write to HDF5 file: {e}") + self.shutdown_event.set() # Signal other threads to shut down. + return + + logger.info( + f"Consumer couldn't obtain data from queue {self.empty_con_queue_count} times." + ) + if self.buffer_resets > 0: + logger.info(f"Buffer was reset {self.buffer_resets} times (live-only mode).") diff --git a/picostream/dfplot.py b/picostream/dfplot.py index cac3a38..aaf1ca7 100644 --- a/picostream/dfplot.py +++ b/picostream/dfplot.py @@ -28,6 +28,9 @@ class HDF5LivePlotter(QMainWindow): """ Real-time oscilloscope-style plotter that reads from HDF5 files. Completely independent of acquisition system for zero-risk operation. + + Supports both standard acquisition files and live-only mode files with + automatic buffer reset detection and graceful handling of file size changes. """ def __init__( @@ -66,6 +69,11 @@ class HDF5LivePlotter(QMainWindow): self.time_data: np.ndarray = np.array([]) self.data_start_sample: int = 0 + # --- Buffer Reset Detection --- + self.last_file_size: int = 0 + self.buffer_reset_count: int = 0 + self.total_samples_processed: int = 0 + # --- HDF5 Metadata --- self.sample_interval_ns: float = 16.0 # Default, will be read from file self.hardware_downsample_ratio: int = 1 @@ -262,6 +270,16 @@ class HDF5LivePlotter(QMainWindow): # Update the display with this complete window (only when data changes) self.update_display(data_window) + def _handle_buffer_reset(self, new_size: int) -> None: + """Handle detection of a buffer reset (file size decreased).""" + self.buffer_reset_count += 1 + self.total_samples_processed += self.last_file_size + logger.debug(f"Buffer reset detected: size {self.last_file_size:,} → {new_size:,} (reset #{self.buffer_reset_count})") + + # Reset rate calculation to avoid confusion + self.rate_check_start_time = None + self.rate_check_start_samples = 0 + def _handle_stale_data(self) -> None: """Handle a file check where no new data is found.""" self.stale_update_count += 1 @@ -276,9 +294,14 @@ class HDF5LivePlotter(QMainWindow): def _update_status_labels(self, current_size: int) -> None: """Update the various status labels in the UI.""" - # Update samples label + # Update samples label (include total if buffer resets occurred) samples_text = self.format_sample_count(current_size) - self.samples_label.setText(f"Samples: {samples_text}") + if self.buffer_reset_count > 0: + total_processed = self.total_samples_processed + current_size + total_text = self.format_sample_count(total_processed) + self.samples_label.setText(f"Samples: {samples_text} (total: {total_text})") + else: + self.samples_label.setText(f"Samples: {samples_text}") # Color-code latency latency_color = ( @@ -371,11 +394,17 @@ class HDF5LivePlotter(QMainWindow): start_index = max(0, current_size - display_window_points) self.data_start_sample = start_index + # Detect buffer resets (file size decreased) + if current_size < self.last_file_size: + self._handle_buffer_reset(current_size) + # Track data freshness and changes if current_size > self.last_displayed_size: self._handle_new_data(dataset, start_index, current_size) else: self._handle_stale_data() + + self.last_file_size = current_size self._update_status_labels(current_size) self._update_rate_label(current_size) diff --git a/picostream/main.py b/picostream/main.py index 44a0c0c..8fc7f78 100644 --- a/picostream/main.py +++ b/picostream/main.py @@ -26,6 +26,9 @@ class Streamer: This class initializes the Picoscope device (producer), the HDF5 writer (consumer), and the live plotter. It manages the threads, queues, and graceful shutdown of the entire application. + + Supports both standard acquisition mode (saves all data) and live-only mode + (limits buffer size using max_buffer_seconds parameter). """ def __init__( @@ -41,12 +44,14 @@ class Streamer: hardware_downsample: int = 1, downsample_mode: str = "average", offset_v: float = 0.0, + max_buffer_seconds: Optional[float] = None, ) -> None: # --- Configuration --- self.output_file = output_file self.debug = debug self.enable_live_plot = enable_live_plot self.plot_window_s = plot_window_s + self.max_buffer_seconds = max_buffer_seconds ( sample_rate_msps, @@ -167,6 +172,15 @@ class Streamer: # --- Consumer --- # Get metadata from configured device and pass to consumer metadata = self.pico_device.get_metadata() + + # Calculate max samples for live-only mode + max_samples = None + if self.max_buffer_seconds: + max_samples = int(effective_rate_sps * self.max_buffer_seconds) + if downsample_mode == "aggregate": + max_samples *= 2 + logger.info(f"Live-only mode: limiting buffer to {self.max_buffer_seconds}s ({max_samples:,} samples)") + self.consumer: Consumer = Consumer( self.consumer_buffer_size, data_queue, @@ -175,6 +189,7 @@ class Streamer: output_file, self.shutdown_event, metadata=metadata, + max_samples=max_samples, ) # --- Threads --- @@ -463,6 +478,11 @@ VOLTAGE_RANGE_MAP = { default=0.0, help="Analog offset in Volts (only for ranges < 5V). [default: 0.0]", ) +@click.option( + "--max-buff-sec", + type=float, + help="Maximum buffer duration in seconds for live-only mode (limits file size).", +) def main( sample_rate: float, resolution: str, @@ -475,6 +495,7 @@ def main( hardware_downsample: int, downsample_mode: str, offset: float, + max_buff_sec: Optional[float], ) -> None: """High-speed data acquisition tool for Picoscope 5000a series.""" # --- Argument Validation and Processing --- @@ -523,6 +544,7 @@ def main( hardware_downsample=hardware_downsample, downsample_mode=downsample_mode, offset_v=offset, + max_buffer_seconds=max_buff_sec, ) streamer.run(app) except RuntimeError as e: @@ -535,27 +557,31 @@ def main( sys.exit(1) # --- Verification Step --- - logger.info(f"Verifying output file: {output}") - try: - expected_samples = streamer.consumer.values_written - if expected_samples == 0: - logger.warning("Consumer processed no samples. Nothing to verify.") - else: - with h5py.File(output, "r") as f: - if "adc_counts" not in f: - raise ValueError("Dataset 'adc_counts' not found in HDF5 file.") - - actual_samples = len(f["adc_counts"]) - if actual_samples == expected_samples: - logger.success( - f"Verification PASSED: File contains {actual_samples} samples, as expected." - ) - else: - logger.error( - f"Verification FAILED: Expected {expected_samples} samples, but file has {actual_samples}." - ) - except Exception as e: - logger.error(f"HDF5 file verification failed: {e}") + # Skip verification in live-only mode since file size is limited + if not streamer.max_buffer_seconds: + logger.info(f"Verifying output file: {output}") + try: + expected_samples = streamer.consumer.values_written + if expected_samples == 0: + logger.warning("Consumer processed no samples. Nothing to verify.") + else: + with h5py.File(output, "r") as f: + if "adc_counts" not in f: + raise ValueError("Dataset 'adc_counts' not found in HDF5 file.") + + actual_samples = len(f["adc_counts"]) + if actual_samples == expected_samples: + logger.success( + f"Verification PASSED: File contains {actual_samples} samples, as expected." + ) + else: + logger.error( + f"Verification FAILED: Expected {expected_samples} samples, but file has {actual_samples}." + ) + except Exception as e: + logger.error(f"HDF5 file verification failed: {e}") + else: + logger.info("Skipping verification in live-only mode.") if __name__ == "__main__": |
