diff options
Diffstat (limited to 'picostream/acquisition_rate.py')
| -rw-r--r-- | picostream/acquisition_rate.py | 287 |
1 files changed, 287 insertions, 0 deletions
diff --git a/picostream/acquisition_rate.py b/picostream/acquisition_rate.py new file mode 100644 index 0000000..f9c3e18 --- /dev/null +++ b/picostream/acquisition_rate.py @@ -0,0 +1,287 @@ +"""Acquisition rate abstraction for consistent rate semantics. + +This module provides a single source of truth for all rate calculations +throughout the picostream pipeline. Rates are represented as immutable +objects that know the relationships between hardware rate, downsampling, +channels, and effective rates at different stages. + +All time/sample conversions should go through the AcquisitionRate object +to ensure consistency and avoid double-counting of downsampling ratios. +""" + +from dataclasses import dataclass +from enum import Enum +from typing import TypeAlias + + +class DownsampleMode(Enum): + """Downsampling modes supported by Picoscope hardware.""" + + NONE = "NONE" + AVERAGE = "AVERAGE" + AGGREGATE = "AGGREGATE" + DECIMATE = "DECIMATE" + + +StorageSampleCount: TypeAlias = int +"""Number of samples in ring buffer (post-hardware-downsampling). + +In AGGREGATE mode, these are interleaved min/max pairs. +""" + +DisplayPointCount: TypeAlias = int +"""Number of points to display on screen (post-software-decimation). + +Always includes min/max pairs for envelope preservation. +""" + + +@dataclass(frozen=True) +class AcquisitionRate: + """Encapsulates all rate information for an acquisition. + + This is the single source of truth for rate semantics throughout + the pipeline. All rate calculations derive from these four core facts. + The object is frozen to prevent accidental misuse or reinterpretation + of rates at different stages. + + Attributes + ---------- + hardware_rate_hz : float + Raw ADC rate from hardware. For multi-channel systems, this is the + total rate across all channels. + Example: 125 MHz for 2 channels at 62.5 MS/s per channel. + + num_channels : int + Number of active acquisition channels. + + downsample_ratio : int + Hardware downsampling factor. 1 means no downsampling. + Example: 10 means every 10th sample is kept. + + downsample_mode : DownsampleMode + Type of downsampling applied: NONE, AVERAGE, AGGREGATE, or DECIMATE. + AGGREGATE produces min/max pairs (2 samples per time point). + """ + + hardware_rate_hz: float + num_channels: int + downsample_ratio: int + downsample_mode: DownsampleMode + + def __post_init__(self) -> None: + """Validate rate parameters.""" + if self.hardware_rate_hz <= 0: + raise ValueError( + f"hardware_rate_hz must be positive, got {self.hardware_rate_hz}" + ) + if self.num_channels <= 0: + raise ValueError(f"num_channels must be positive, got {self.num_channels}") + if self.downsample_ratio < 1: + raise ValueError( + f"downsample_ratio must be >= 1, got {self.downsample_ratio}" + ) + if not isinstance(self.downsample_mode, DownsampleMode): + raise ValueError( + f"downsample_mode must be DownsampleMode, got {type(self.downsample_mode)}" + ) + + @property + def per_channel_rate_hz(self) -> float: + """Per-channel rate after hardware downsampling. + + This is the rate at which individual channels produce time points + after downsampling is applied. + + Returns + ------- + float + Samples per second per channel. + + Example + ------- + hardware_rate_hz=125e6, num_channels=2, downsample_ratio=10 + → per_channel_rate_hz = 125e6 / (2 * 10) = 6.25e6 MS/s + """ + return self.hardware_rate_hz / (self.num_channels * self.downsample_ratio) + + @property + def storage_rate_hz(self) -> float: + """Total rate at which samples arrive at ring buffer. + + This is the rate to use for ring buffer capacity calculations and + writing to storage. It accounts for AGGREGATE mode producing 2 samples + per downsampled point (min/max pairs interleaved). + + For non-AGGREGATE modes: storage_rate = hardware_rate / downsample_ratio + For AGGREGATE mode: storage_rate = (hardware_rate / downsample_ratio) * 2 + + Returns + ------- + float + Total samples per second arriving at the buffer. + + Example + ------- + hardware_rate_hz=125e6, downsample_ratio=10, mode=DECIMATE + → storage_rate_hz = 125e6 / 10 = 12.5 MS/s + + hardware_rate_hz=125e6, downsample_ratio=10, mode=AGGREGATE + → storage_rate_hz = (125e6 / 10) * 2 = 25 MS/s + """ + base_rate = self.hardware_rate_hz / self.downsample_ratio + if ( + self.downsample_mode == DownsampleMode.AGGREGATE + and self.downsample_ratio > 1 + ): + return base_rate * 2 + return base_rate + + @property + def display_rate_per_channel_hz(self) -> float: + """Per-channel rate for time axis generation in display. + + Used by the plotter for converting sample counts to time durations. + This is the "semantic" time rate for the user - how fast time flows + in the plot regardless of how samples are packed in the buffer. + + Returns + ------- + float + Samples per second per channel as perceived in display time. + """ + return self.per_channel_rate_hz + + def samples_to_seconds(self, n_samples: StorageSampleCount) -> float: + """Convert sample count to time duration. + + Operates on per-channel sample counts (time-points), properly handling + AGGREGATE mode where storage includes interleaved min/max pairs. + + The conversion uses per_channel_rate_hz, not storage_rate_hz, because + this method is designed for time-point counts (one value per channel + per time step), not total storage sample counts. + + Parameters + ---------- + n_samples : StorageSampleCount + Number of time-points (samples per channel). For AGGREGATE mode, + this counts each min/max pair as 2 samples but represents 1 time-point. + + Returns + ------- + float + Duration in seconds. + + Example + ------- + If per_channel_rate = 6.25 MS/s and n_samples = 6.25M: + - NONE mode: 6.25e6 / 6.25e6 = 1.0 second + - AGGREGATE mode: (6.25e6 // 2) / 6.25e6 = 0.5 second + (because 6.25M samples = 3.125M pairs = 0.5s of data) + """ + if ( + self.downsample_mode == DownsampleMode.AGGREGATE + and self.downsample_ratio > 1 + ): + # Each min/max pair represents one time point + n_time_points = n_samples // 2 + return n_time_points / self.display_rate_per_channel_hz + return n_samples / self.display_rate_per_channel_hz + + def seconds_to_samples(self, duration_s: float) -> StorageSampleCount: + """Convert time duration to sample count. + + Returns the number of storage samples (including min/max pairs for + AGGREGATE mode) needed to represent the given duration. + + Note: This operates on per-channel rates. The returned count is for + storage capacity calculations. For time-point counts (one per channel), + divide the result by num_channels. + + Parameters + ---------- + duration_s : float + Duration in seconds. + + Returns + ------- + StorageSampleCount + Number of storage samples needed. For AGGREGATE mode with + downsampling, this is 2× the number of time-points. + + Example + ------- + If per_channel_rate = 6.25 MS/s and duration = 1.0s: + - NONE mode: 1.0 * 6.25e6 = 6.25M samples + - AGGREGATE mode: (1.0 * 6.25e6) * 2 = 12.5M samples + (because we store both min and max for each time point) + """ + n_time_points = int(duration_s * self.display_rate_per_channel_hz) + if ( + self.downsample_mode == DownsampleMode.AGGREGATE + and self.downsample_ratio > 1 + ): + return n_time_points * 2 + return n_time_points + + def get_display_duration( + self, n_display_samples: DisplayPointCount, decimation: int + ) -> float: + """Calculate time duration represented by display samples. + + DEPRECATED: This method mixes concerns by trying to reverse-engineer + duration from display points. Use samples_to_seconds() with storage + samples instead. + + Accounts for both software min-max decimation (which produces pairs) + and the hardware downsampling that's already baked into the rate. + + Parameters + ---------- + n_display_samples : DisplayPointCount + Number of display points (after software decimation). + decimation : int + Software decimation factor used for display (minimum 1). + + Returns + ------- + float + Duration in seconds that the display data represents. + + Notes + ----- + This is used by the plotter to know how much time is represented + by the currently displayed points, independent of how many points + are actually being drawn on screen. + """ + if decimation <= 0 or n_display_samples <= 0: + return 0.0 + + # Software min-max decimation produces pairs + n_time_points = n_display_samples // 2 + original_samples = n_time_points * decimation + + return original_samples / self.display_rate_per_channel_hz + + def __str__(self) -> str: + """Human-readable description of the acquisition rate.""" + return ( + f"AcquisitionRate(" + f"hw={self.hardware_rate_hz / 1e6:.1f}MS/s, " + f"ch={self.num_channels}, " + f"ds={self.downsample_ratio}x {self.downsample_mode.value}, " + f"→ {self.per_channel_rate_hz / 1e6:.2f}MS/s/ch, " + f"{self.storage_rate_hz / 1e6:.2f}MS/s storage" + f")" + ) + + def __repr__(self) -> str: + """Development-friendly representation.""" + return ( + f"AcquisitionRate(" + f"hardware_rate_hz={self.hardware_rate_hz}, " + f"num_channels={self.num_channels}, " + f"downsample_ratio={self.downsample_ratio}, " + f"downsample_mode={self.downsample_mode})" + ) |
