diff options
Diffstat (limited to 'picostream/cli.py')
| -rw-r--r-- | picostream/cli.py | 699 |
1 files changed, 0 insertions, 699 deletions
diff --git a/picostream/cli.py b/picostream/cli.py deleted file mode 100644 index d8e7ffb..0000000 --- a/picostream/cli.py +++ /dev/null @@ -1,699 +0,0 @@ -from __future__ import annotations - -import queue -import signal -import sys -import threading -import time -from datetime import datetime -from typing import TYPE_CHECKING, List, Optional - -if TYPE_CHECKING: - from PyQt5.QtWidgets import QApplication - -import click -import h5py -import numpy as np -from loguru import logger - -from . import __version__ -from .consumer import Consumer -from .pico import PicoDevice - - -class Streamer: - """Orchestrates the Picoscope data acquisition process. - - This class initializes the Picoscope device (producer), the HDF5 writer - (consumer), and the live plotter. It manages the threads, queues, and - graceful shutdown of the entire application. - - Supports both standard acquisition mode (saves all data) and live-only mode - (limits buffer size using max_buffer_seconds parameter). - """ - - def __init__( - self, - sample_rate_msps: float = 62.5, - resolution_bits: int = 12, - channel_range_str: str = "PS5000A_20V", - enable_live_plot: bool = False, - output_file: str = "./output.hdf5", - debug: bool = False, - plot_window_s: float = 0.5, - plot_points: int = 4000, - hardware_downsample: int = 1, - downsample_mode: str = "average", - offset_v: float = 0.0, - max_buffer_seconds: Optional[float] = None, - is_gui_mode: bool = False, - y_min: Optional[float] = None, - y_max: Optional[float] = None, - bandwidth_limiter: str = "full", - ) -> None: - # --- Configuration --- - self.output_file = output_file - self.debug = debug - self.enable_live_plot = enable_live_plot - self.is_gui_mode = is_gui_mode - self.plot_window_s = plot_window_s - self.max_buffer_seconds = max_buffer_seconds - self.y_min = y_min - self.y_max = y_max - - ( - sample_rate_msps, - pico_downsample_ratio, - pico_ratio_mode, - offset_v, - ) = self._validate_config( - resolution_bits, - sample_rate_msps, - channel_range_str, - hardware_downsample, - downsample_mode, - offset_v, - ) - # Dynamically size buffers to hold a specific duration of data. This makes - # memory usage proportional to the data rate, providing a consistent - # time-based buffer to handle processing latencies. - effective_rate_sps = (sample_rate_msps * 1e6) / pico_downsample_ratio - - # Consumer buffers (for writing to HDF5) are sized to hold 1 second of data. - # This is a good balance, as larger buffers lead to more efficient disk writes - # but use more RAM. - consumer_buffer_duration_s = 1.0 - self.consumer_buffer_size = int( - effective_rate_sps * consumer_buffer_duration_s - ) - if downsample_mode == "aggregate": - self.consumer_buffer_size *= 2 - self.consumer_num_buffers = 5 # A pool of 5 buffers - - # The Picoscope driver buffer is sized to hold 0.5 seconds of data. This - # buffer receives data directly from the hardware. A smaller size ensures - # that the application receives data in timely chunks, reducing latency. - driver_buffer_duration_s = 0.5 - self.pico_driver_buffer_size = int( - effective_rate_sps * driver_buffer_duration_s - ) - self.pico_driver_num_buffers = ( - 1 # A single large buffer is efficient for the driver - ) - - logger.info( - f"Consumer buffer sized to {self.consumer_buffer_size:,} samples " - f"({consumer_buffer_duration_s}s at effective rate)" - ) - logger.info( - f"Pico driver buffer sized to {self.pico_driver_buffer_size:,} samples " - f"({driver_buffer_duration_s}s at effective rate)" - ) - - # --- Plotting Decimation --- - # Calculate the decimation factor needed to achieve the target number of plot points. - points_per_timestep = 2 if downsample_mode == "aggregate" else 1 - samples_in_window = effective_rate_sps * plot_window_s * points_per_timestep - self.decimation_factor = max(1, int(samples_in_window / plot_points)) - logger.info( - f"Plotting with target of {plot_points} points. " - f"Calculated decimation factor: {self.decimation_factor}" - ) - - # Picoscope hardware settings - self.pico_resolution = f"PS5000A_DR_{resolution_bits}BIT" - self.pico_channel_range = channel_range_str - self.pico_sample_interval_ns = int(1000 / sample_rate_msps) - self.pico_sample_unit = "PS5000A_NS" - - # Streaming settings - self.pico_auto_stop = 0 # Don't auto stop - self.pico_auto_stop_stream = False - # --- End Configuration --- - - # --- System Components --- - self.shutdown_event: threading.Event = threading.Event() - data_queue: queue.Queue[int] = queue.Queue() - empty_queue: queue.Queue[int] = queue.Queue() - data_buffers: List[np.ndarray] = [] - - # Pre-allocate a pool of numpy arrays for data transfer and populate the - # empty_queue with their indices. - for idx in range(self.consumer_num_buffers): - data_buffers.append(np.empty((self.consumer_buffer_size,), dtype="int16")) - empty_queue.put(idx) - - # --- Producer --- - self.pico_device: PicoDevice = PicoDevice( - 0, # handle - self.pico_resolution, - self.pico_driver_buffer_size, - self.pico_driver_num_buffers, - self.consumer_buffer_size, - data_queue, - empty_queue, - data_buffers, - self.shutdown_event, - downsample_mode=downsample_mode, - ) - - # Store bandwidth limiter setting - self.bandwidth_limiter = bandwidth_limiter - - self.pico_device.set_channel( - "PS5000A_CHANNEL_A", 1, "PS5000A_DC", self.pico_channel_range, offset_v - ) - self.pico_device.set_channel( - "PS5000A_CHANNEL_B", 0, "PS5000A_DC", self.pico_channel_range, 0.0 - ) - - # Set bandwidth filter for Channel A if specified - if self.bandwidth_limiter == "20MHz": - self.pico_device.set_bandwidth_filter("PS5000A_CHANNEL_A", "PS5000A_BW_20MHZ") - else: - self.pico_device.set_bandwidth_filter("PS5000A_CHANNEL_A", "PS5000A_BW_FULL") - self.pico_device.set_data_buffer("PS5000A_CHANNEL_A", 0, pico_ratio_mode) - self.pico_device.configure_streaming_var( - self.pico_sample_interval_ns, - self.pico_sample_unit, - 0, # pre-trigger samples - pico_downsample_ratio, - pico_ratio_mode, - self.pico_auto_stop, - self.pico_auto_stop_stream, - ) - - # Run streaming once to get the actual sample interval from the driver - self.pico_device.run_streaming() - - # --- Consumer --- - # Prepare metadata for the consumer - acquisition_start_time_utc = datetime.utcnow().isoformat() + "Z" - was_live_mode = self.max_buffer_seconds is not None - - # Get metadata from configured device and pass to consumer - metadata = self.pico_device.get_metadata( - acquisition_start_time_utc=acquisition_start_time_utc, - picostream_version=__version__, - acquisition_command="", # Will be set later in main() - was_live_mode=was_live_mode - ) - - # Calculate max samples for live-only mode - max_samples = None - if self.max_buffer_seconds: - max_samples = int(effective_rate_sps * self.max_buffer_seconds) - if downsample_mode == "aggregate": - max_samples *= 2 - logger.info(f"Live-only mode: limiting buffer to {self.max_buffer_seconds}s ({max_samples:,} samples)") - - self.consumer: Consumer = Consumer( - self.consumer_buffer_size, - data_queue, - empty_queue, - data_buffers, - output_file, - self.shutdown_event, - metadata=metadata, - max_samples=max_samples, - ) - - # --- Threads --- - self.consumer_thread: threading.Thread = threading.Thread( - target=self.consumer.consume - ) - self.pico_thread: threading.Thread = threading.Thread( - target=self.pico_device.run_capture - ) - - # --- Signal Handling --- - # Only set the signal handler if not in GUI mode. - # In GUI mode, the main window handles shutdown signals. - # In CLI plot mode, the main() function handles SIGINT to quit the Qt app. - if not self.is_gui_mode and not self.enable_live_plot: - signal.signal(signal.SIGINT, self.signal_handler) - - # --- Live Plotting (optional) --- - self.start_time: Optional[float] = None - - def update_acquisition_command(self, command: str) -> None: - """Update the acquisition command in the consumer's metadata.""" - self.consumer.metadata["acquisition_command"] = command - - def _validate_config( - self, - resolution_bits: int, - sample_rate_msps: float, - channel_range_str: str, - hardware_downsample: int, - downsample_mode: str, - offset_v: float, - ) -> tuple[float, int, str, float]: - """Validates user-provided settings and returns derived configuration.""" - if resolution_bits == 8: - max_rate_msps = 125.0 - elif resolution_bits in [12, 14, 15, 16]: - max_rate_msps = 62.5 - else: - raise ValueError( - f"Unsupported resolution: {resolution_bits} bits. Must be one of 8, 12, 14, 15, 16." - ) - - if sample_rate_msps <= 0: - sample_rate_msps = max_rate_msps - logger.info(f"Max sample rate requested. Setting to {max_rate_msps} MS/s.") - - if sample_rate_msps > max_rate_msps: - raise ValueError( - f"Sample rate {sample_rate_msps} MS/s exceeds maximum of {max_rate_msps} MS/s for {resolution_bits}-bit resolution." - ) - - # Check if sample rate is excessive for the analog bandwidth. - # Bandwidth is dependent on both resolution and voltage range. - # (Based on PicoScope 5000A/B Series datasheet) - inv_voltage_map = {v: k for k, v in VOLTAGE_RANGE_MAP.items()} - voltage_v = inv_voltage_map.get(channel_range_str, 0) - - if resolution_bits == 16: - bandwidth_mhz = 20 # 20 MHz for all ranges - elif resolution_bits == 15: - # Bandwidth is 70MHz for < ±5V, 60MHz for >= ±5V - bandwidth_mhz = 70 if voltage_v < 5.0 else 60 - else: # 8-14 bits - # Bandwidth is 100MHz for < ±5V, 60MHz for >= ±5V - bandwidth_mhz = 100 if voltage_v < 5.0 else 60 - - # Nyquist rate is 2x bandwidth. A common rule of thumb is 3-5x. - # Warn if sampling faster than 5x the analog bandwidth. - if sample_rate_msps > 5 * bandwidth_mhz: - logger.warning( - f"Sample rate ({sample_rate_msps} MS/s) may be unnecessarily high " - f"for the selected voltage range ({channel_range_str}), which has an " - f"analog bandwidth of {bandwidth_mhz} MHz." - ) - - if downsample_mode == "aggregate" and hardware_downsample <= 1: - raise ValueError( - "Hardware downsample ratio must be > 1 for 'aggregate' mode." - ) - - if hardware_downsample > 1: - pico_downsample_ratio = hardware_downsample - pico_ratio_mode = f"PS5000A_RATIO_MODE_{downsample_mode.upper()}" - logger.info( - f"Hardware down-sampling ({downsample_mode}) enabled " - + f"with ratio {pico_downsample_ratio}." - ) - else: - pico_downsample_ratio = 1 - pico_ratio_mode = "PS5000A_RATIO_MODE_NONE" - - # Validate analog offset - if offset_v != 0.0: - if voltage_v >= 5.0: - raise ValueError( - f"Analog offset is not supported for voltage ranges >= 5V (selected: {channel_range_str})." - ) - if abs(offset_v) > voltage_v: - raise ValueError( - f"Analog offset ({offset_v}V) exceeds the selected voltage range (±{voltage_v}V)." - ) - logger.info(f"Analog offset set to {offset_v:.3f}V.") - - return sample_rate_msps, pico_downsample_ratio, pico_ratio_mode, offset_v - - def signal_handler(self, _sig: int, frame: Optional[object]) -> None: - """Handles Ctrl+C interrupts to initiate a graceful shutdown.""" - logger.warning("Ctrl+C detected. Shutting down.") - self.shutdown() - - def shutdown(self) -> None: - """Performs a graceful shutdown of all components. - - This method calculates final statistics, stops all threads, closes the - plotter, and ensures the Picoscope device is properly closed. - """ - if self.shutdown_event.is_set(): - return - - self._log_acquisition_summary() - self.shutdown_event.set() - - logger.info("Stopping data acquisition and saving...") - - self.pico_device.close_device() - self._join_threads() - - logger.success("Shutdown complete.") - - def _log_acquisition_summary(self) -> None: - """Calculates and logs final acquisition statistics.""" - if not self.start_time: - return - - end_time = time.time() - duration = end_time - self.start_time - total_samples = self.consumer.values_written - effective_rate_msps = (total_samples / duration) / 1e6 if duration > 0 else 0 - configured_rate_msps = 1e3 / self.pico_device.sample_int.value - - logger.info("--- Acquisition Summary ---") - logger.info(f"Total acquisition time: {duration:.2f} s") - logger.info( - "Total samples written: " - + f"{self.consumer.format_sample_count(total_samples)}" - ) - logger.info(f"Configured sample rate: {configured_rate_msps:.2f} MS/s") - logger.info(f"Effective average rate: {effective_rate_msps:.2f} MS/s") - - rate_ratio = ( - effective_rate_msps / configured_rate_msps - if configured_rate_msps > 0 - else 0 - ) - if rate_ratio < 0.95: - logger.warning( - f"Effective rate was only {rate_ratio:.1%} " + "of the configured rate." - ) - else: - logger.success("Effective rate matches configured rate.") - logger.info("--------------------------") - - def _join_threads(self) -> None: - """Waits for the producer and consumer threads to terminate.""" - for thread_name in ["pico_thread", "consumer_thread"]: - thread = getattr(self, thread_name, None) - if thread and thread.is_alive(): - logger.info(f"Waiting for {thread_name} to terminate...") - thread.join(timeout=2.0) - if thread.is_alive(): - logger.critical(f"{thread_name} failed to terminate.") - - def run(self, app: Optional[QApplication] = None) -> None: - """Starts the acquisition threads and optionally the Qt event loop.""" - # Start acquisition threads - self.start_time = time.time() - self.consumer_thread.start() - self.pico_thread.start() - - # Handle Qt event loop if plotting is enabled - if self.enable_live_plot and app: - from .dfplot import HDF5LivePlotter - - plotter = HDF5LivePlotter( - hdf5_path=self.output_file, - display_window_seconds=self.plot_window_s, - decimation_factor=self.decimation_factor, - y_min=self.y_min, - y_max=self.y_max, - ) - plotter.show() - - # Run the Qt event loop. This will block until the plot window is closed. - app.exec_() - - # Once the window is closed, the shutdown event should have been set. - # We call shutdown() to ensure threads are joined and cleanup happens. - self.shutdown() - else: - # In GUI mode, run() returns immediately, allowing the worker's event - # loop to process signals. In CLI mode, we block until completion. - if not self.is_gui_mode: - self.consumer_thread.join() - self.pico_thread.join() - logger.success("Acquisition complete!") - - -# --- Argument Parsing --- -VOLTAGE_RANGE_MAP = { - 0.01: "PS5000A_10MV", - 0.02: "PS5000A_20MV", - 0.05: "PS5000A_50MV", - 0.1: "PS5000A_100MV", - 0.2: "PS5000A_200MV", - 0.5: "PS5000A_500MV", - 1: "PS5000A_1V", - 2: "PS5000A_2V", - 5: "PS5000A_5V", - 10: "PS5000A_10V", - 20: "PS5000A_20V", -} - - -def generate_unique_filename(base_path: str) -> str: - """Generate a unique filename by appending timestamp if file exists.""" - import os - from pathlib import Path - - if not os.path.exists(base_path): - return base_path - - # Split the path into parts - path = Path(base_path) - stem = path.stem - suffix = path.suffix - parent = path.parent - - # Generate timestamp - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - - # Create new filename with timestamp - new_filename = f"{stem}_{timestamp}{suffix}" - new_path = parent / new_filename - - return str(new_path) - - -@click.command() -@click.option( - "--sample-rate", - "-s", - type=float, - default=20, - help="Sample rate in MS/s (e.g., 62.5). Use 0 for max rate. [default: 20]", -) -@click.option( - "--resolution", - "-b", - type=click.Choice(["8", "12", "16"]), - default="12", - help="Resolution in bits. [default: 12]", -) -@click.option( - "--rangev", - "-r", - type=click.Choice([str(k) for k in sorted(VOLTAGE_RANGE_MAP.keys())]), - default="20", - help=f"Voltage range in Volts. [default: 20]", -) -@click.option( - "--plot/--no-plot", - "-p", - is_flag=True, - default=True, - help="Enable/disable live plotting. [default: --plot]", -) -@click.option( - "--output", - "-o", - type=click.Path(dir_okay=False, writable=True), - help="Output HDF5 file (default: auto-timestamped).", -) -@click.option( - "--plot-window", - "-w", - type=float, - default=0.5, - help="Live plot display window duration in seconds. [default: 0.5]", -) -@click.option( - "--verbose", "-v", is_flag=True, default=False, help="Enable debug logging." -) -@click.option( - "--plot-npts", - "-n", - type=int, - default=4000, - help="Target number of points for the plot window. [default: 4000]", -) -@click.option( - "--hardware-downsample", - "-h", - type=int, - default=1, - help="Hardware down-sampling ratio (power of 2 for 'average' mode). [default: 1]", -) -@click.option( - "--downsample-mode", - "-m", - type=click.Choice(["average", "aggregate"]), - default="average", - help="Hardware down-sampling mode. [default: average]", -) -@click.option( - "--offset", - type=float, - default=0.0, - help="Analog offset in Volts (only for ranges < 5V). [default: 0.0]", -) -@click.option( - "--bandwidth", - type=click.Choice(["full", "20MHz"]), - default="full", - help="Bandwidth limiter to reduce noise. [default: full]", -) -@click.option( - "--max-buff-sec", - type=float, - help="Maximum buffer duration in seconds for live-only mode (limits file size).", -) -@click.option( - "--force", - "-f", - is_flag=True, - default=False, - help="Overwrite existing output file.", -) -@click.option( - "--y-min", - type=float, - help="Minimum Y-axis limit in mV for live plot.", -) -@click.option( - "--y-max", - type=float, - help="Maximum Y-axis limit in mV for live plot.", -) -def main( - sample_rate: float, - resolution: str, - rangev: str, - plot: bool, - output: Optional[str], - plot_window: float, - verbose: bool, - plot_npts: int, - hardware_downsample: int, - downsample_mode: str, - offset: float, - bandwidth: str, - max_buff_sec: Optional[float], - force: bool, - y_min: Optional[float], - y_max: Optional[float], -) -> None: - """High-speed data acquisition tool for Picoscope 5000a series.""" - # --- Argument Validation and Processing --- - - # Validate Y-axis limits - if (y_min is None) != (y_max is None): - logger.error("Both --y-min and --y-max must be provided together, or neither.") - sys.exit(1) - - if y_min is not None and y_max is not None and y_min >= y_max: - logger.error(f"Invalid Y-axis range: y_min ({y_min}) must be less than y_max ({y_max}).") - sys.exit(1) - - channel_range_str = VOLTAGE_RANGE_MAP[float(rangev)] - resolution_bits = int(resolution) - - app: Optional[QApplication] = None - if plot: - from PyQt5.QtWidgets import QApplication - - app = QApplication(sys.argv) - - # When plotting, SIGINT should gracefully close the Qt application. - # The main loop will then handle the shutdown. - def sigint_handler(_sig: int, _frame: Optional[object]) -> None: - logger.warning("Ctrl+C detected. Closing application.") - QApplication.quit() - - signal.signal(signal.SIGINT, sigint_handler) - - # Configure logging - logger.remove() - log_level = "DEBUG" if verbose else "INFO" - logger.add(sys.stderr, level=log_level) - logger.info(f"Logging configured at level: {log_level}") - - # Auto-generate filename if not specified - if not output: - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - output = f"./output_{timestamp}.hdf5" - else: - # Check if file exists and handle accordingly - if not force: - original_output = output - output = generate_unique_filename(output) - if output != original_output: - logger.info(f"File '{original_output}' exists. Using '{output}' instead.") - logger.info("Use --force/-f to overwrite existing files.") - - logger.info(f"Output file: {output}") - logger.info(f"Selected voltage range: {rangev}V -> {channel_range_str}") - - try: - # Create and run the streamer - streamer = Streamer( - sample_rate_msps=sample_rate, - resolution_bits=resolution_bits, - channel_range_str=channel_range_str, - enable_live_plot=plot, - output_file=output, - debug=verbose, - plot_window_s=plot_window, - plot_points=plot_npts, - hardware_downsample=hardware_downsample, - downsample_mode=downsample_mode, - offset_v=offset, - max_buffer_seconds=max_buff_sec, - y_min=y_min, - y_max=y_max, - bandwidth_limiter=bandwidth, - ) - - # Update the acquisition command in metadata - acquisition_command = " ".join(sys.argv) - streamer.update_acquisition_command(acquisition_command) - - streamer.run(app) - except RuntimeError as e: - if "PICO_NOT_FOUND" in str(e): - logger.critical( - "Picoscope device not found. Please check connection and ensure no other software is using it." - ) - else: - logger.critical(f"Failed to initialize Picoscope: {e}") - sys.exit(1) - - # --- Verification Step --- - # Skip verification in live-only mode since file size is limited - if not streamer.max_buffer_seconds: - logger.info(f"Verifying output file: {output}") - try: - expected_samples = streamer.consumer.values_written - if expected_samples == 0: - logger.warning("Consumer processed no samples. Nothing to verify.") - else: - with h5py.File(output, "r") as f: - if "adc_counts" not in f: - raise ValueError("Dataset 'adc_counts' not found in HDF5 file.") - - actual_samples = len(f["adc_counts"]) - if actual_samples == expected_samples: - logger.success( - f"Verification PASSED: File contains {actual_samples} samples, as expected." - ) - else: - logger.error( - f"Verification FAILED: Expected {expected_samples} samples, but file has {actual_samples}." - ) - except Exception as e: - logger.error(f"HDF5 file verification failed: {e}") - else: - logger.info("Skipping verification in live-only mode.") - - -if __name__ == "__main__": - main() |
