summaryrefslogtreecommitdiff
path: root/picostream/acquisition_rate.py
diff options
context:
space:
mode:
Diffstat (limited to 'picostream/acquisition_rate.py')
-rw-r--r--picostream/acquisition_rate.py287
1 files changed, 287 insertions, 0 deletions
diff --git a/picostream/acquisition_rate.py b/picostream/acquisition_rate.py
new file mode 100644
index 0000000..f9c3e18
--- /dev/null
+++ b/picostream/acquisition_rate.py
@@ -0,0 +1,287 @@
+"""Acquisition rate abstraction for consistent rate semantics.
+
+This module provides a single source of truth for all rate calculations
+throughout the picostream pipeline. Rates are represented as immutable
+objects that know the relationships between hardware rate, downsampling,
+channels, and effective rates at different stages.
+
+All time/sample conversions should go through the AcquisitionRate object
+to ensure consistency and avoid double-counting of downsampling ratios.
+"""
+
+from dataclasses import dataclass
+from enum import Enum
+from typing import TypeAlias
+
+
+class DownsampleMode(Enum):
+ """Downsampling modes supported by Picoscope hardware."""
+
+ NONE = "NONE"
+ AVERAGE = "AVERAGE"
+ AGGREGATE = "AGGREGATE"
+ DECIMATE = "DECIMATE"
+
+
+StorageSampleCount: TypeAlias = int
+"""Number of samples in ring buffer (post-hardware-downsampling).
+
+In AGGREGATE mode, these are interleaved min/max pairs.
+"""
+
+DisplayPointCount: TypeAlias = int
+"""Number of points to display on screen (post-software-decimation).
+
+Always includes min/max pairs for envelope preservation.
+"""
+
+
+@dataclass(frozen=True)
+class AcquisitionRate:
+ """Encapsulates all rate information for an acquisition.
+
+ This is the single source of truth for rate semantics throughout
+ the pipeline. All rate calculations derive from these four core facts.
+ The object is frozen to prevent accidental misuse or reinterpretation
+ of rates at different stages.
+
+ Attributes
+ ----------
+ hardware_rate_hz : float
+ Raw ADC rate from hardware. For multi-channel systems, this is the
+ total rate across all channels.
+ Example: 125 MHz for 2 channels at 62.5 MS/s per channel.
+
+ num_channels : int
+ Number of active acquisition channels.
+
+ downsample_ratio : int
+ Hardware downsampling factor. 1 means no downsampling.
+ Example: 10 means every 10th sample is kept.
+
+ downsample_mode : DownsampleMode
+ Type of downsampling applied: NONE, AVERAGE, AGGREGATE, or DECIMATE.
+ AGGREGATE produces min/max pairs (2 samples per time point).
+ """
+
+ hardware_rate_hz: float
+ num_channels: int
+ downsample_ratio: int
+ downsample_mode: DownsampleMode
+
+ def __post_init__(self) -> None:
+ """Validate rate parameters."""
+ if self.hardware_rate_hz <= 0:
+ raise ValueError(
+ f"hardware_rate_hz must be positive, got {self.hardware_rate_hz}"
+ )
+ if self.num_channels <= 0:
+ raise ValueError(f"num_channels must be positive, got {self.num_channels}")
+ if self.downsample_ratio < 1:
+ raise ValueError(
+ f"downsample_ratio must be >= 1, got {self.downsample_ratio}"
+ )
+ if not isinstance(self.downsample_mode, DownsampleMode):
+ raise ValueError(
+ f"downsample_mode must be DownsampleMode, got {type(self.downsample_mode)}"
+ )
+
+ @property
+ def per_channel_rate_hz(self) -> float:
+ """Per-channel rate after hardware downsampling.
+
+ This is the rate at which individual channels produce time points
+ after downsampling is applied.
+
+ Returns
+ -------
+ float
+ Samples per second per channel.
+
+ Example
+ -------
+ hardware_rate_hz=125e6, num_channels=2, downsample_ratio=10
+ → per_channel_rate_hz = 125e6 / (2 * 10) = 6.25e6 MS/s
+ """
+ return self.hardware_rate_hz / (self.num_channels * self.downsample_ratio)
+
+ @property
+ def storage_rate_hz(self) -> float:
+ """Total rate at which samples arrive at ring buffer.
+
+ This is the rate to use for ring buffer capacity calculations and
+ writing to storage. It accounts for AGGREGATE mode producing 2 samples
+ per downsampled point (min/max pairs interleaved).
+
+ For non-AGGREGATE modes: storage_rate = hardware_rate / downsample_ratio
+ For AGGREGATE mode: storage_rate = (hardware_rate / downsample_ratio) * 2
+
+ Returns
+ -------
+ float
+ Total samples per second arriving at the buffer.
+
+ Example
+ -------
+ hardware_rate_hz=125e6, downsample_ratio=10, mode=DECIMATE
+ → storage_rate_hz = 125e6 / 10 = 12.5 MS/s
+
+ hardware_rate_hz=125e6, downsample_ratio=10, mode=AGGREGATE
+ → storage_rate_hz = (125e6 / 10) * 2 = 25 MS/s
+ """
+ base_rate = self.hardware_rate_hz / self.downsample_ratio
+ if (
+ self.downsample_mode == DownsampleMode.AGGREGATE
+ and self.downsample_ratio > 1
+ ):
+ return base_rate * 2
+ return base_rate
+
+ @property
+ def display_rate_per_channel_hz(self) -> float:
+ """Per-channel rate for time axis generation in display.
+
+ Used by the plotter for converting sample counts to time durations.
+ This is the "semantic" time rate for the user - how fast time flows
+ in the plot regardless of how samples are packed in the buffer.
+
+ Returns
+ -------
+ float
+ Samples per second per channel as perceived in display time.
+ """
+ return self.per_channel_rate_hz
+
+ def samples_to_seconds(self, n_samples: StorageSampleCount) -> float:
+ """Convert sample count to time duration.
+
+ Operates on per-channel sample counts (time-points), properly handling
+ AGGREGATE mode where storage includes interleaved min/max pairs.
+
+ The conversion uses per_channel_rate_hz, not storage_rate_hz, because
+ this method is designed for time-point counts (one value per channel
+ per time step), not total storage sample counts.
+
+ Parameters
+ ----------
+ n_samples : StorageSampleCount
+ Number of time-points (samples per channel). For AGGREGATE mode,
+ this counts each min/max pair as 2 samples but represents 1 time-point.
+
+ Returns
+ -------
+ float
+ Duration in seconds.
+
+ Example
+ -------
+ If per_channel_rate = 6.25 MS/s and n_samples = 6.25M:
+ - NONE mode: 6.25e6 / 6.25e6 = 1.0 second
+ - AGGREGATE mode: (6.25e6 // 2) / 6.25e6 = 0.5 second
+ (because 6.25M samples = 3.125M pairs = 0.5s of data)
+ """
+ if (
+ self.downsample_mode == DownsampleMode.AGGREGATE
+ and self.downsample_ratio > 1
+ ):
+ # Each min/max pair represents one time point
+ n_time_points = n_samples // 2
+ return n_time_points / self.display_rate_per_channel_hz
+ return n_samples / self.display_rate_per_channel_hz
+
+ def seconds_to_samples(self, duration_s: float) -> StorageSampleCount:
+ """Convert time duration to sample count.
+
+ Returns the number of storage samples (including min/max pairs for
+ AGGREGATE mode) needed to represent the given duration.
+
+ Note: This operates on per-channel rates. The returned count is for
+ storage capacity calculations. For time-point counts (one per channel),
+ divide the result by num_channels.
+
+ Parameters
+ ----------
+ duration_s : float
+ Duration in seconds.
+
+ Returns
+ -------
+ StorageSampleCount
+ Number of storage samples needed. For AGGREGATE mode with
+ downsampling, this is 2× the number of time-points.
+
+ Example
+ -------
+ If per_channel_rate = 6.25 MS/s and duration = 1.0s:
+ - NONE mode: 1.0 * 6.25e6 = 6.25M samples
+ - AGGREGATE mode: (1.0 * 6.25e6) * 2 = 12.5M samples
+ (because we store both min and max for each time point)
+ """
+ n_time_points = int(duration_s * self.display_rate_per_channel_hz)
+ if (
+ self.downsample_mode == DownsampleMode.AGGREGATE
+ and self.downsample_ratio > 1
+ ):
+ return n_time_points * 2
+ return n_time_points
+
+ def get_display_duration(
+ self, n_display_samples: DisplayPointCount, decimation: int
+ ) -> float:
+ """Calculate time duration represented by display samples.
+
+ DEPRECATED: This method mixes concerns by trying to reverse-engineer
+ duration from display points. Use samples_to_seconds() with storage
+ samples instead.
+
+ Accounts for both software min-max decimation (which produces pairs)
+ and the hardware downsampling that's already baked into the rate.
+
+ Parameters
+ ----------
+ n_display_samples : DisplayPointCount
+ Number of display points (after software decimation).
+ decimation : int
+ Software decimation factor used for display (minimum 1).
+
+ Returns
+ -------
+ float
+ Duration in seconds that the display data represents.
+
+ Notes
+ -----
+ This is used by the plotter to know how much time is represented
+ by the currently displayed points, independent of how many points
+ are actually being drawn on screen.
+ """
+ if decimation <= 0 or n_display_samples <= 0:
+ return 0.0
+
+ # Software min-max decimation produces pairs
+ n_time_points = n_display_samples // 2
+ original_samples = n_time_points * decimation
+
+ return original_samples / self.display_rate_per_channel_hz
+
+ def __str__(self) -> str:
+ """Human-readable description of the acquisition rate."""
+ return (
+ f"AcquisitionRate("
+ f"hw={self.hardware_rate_hz / 1e6:.1f}MS/s, "
+ f"ch={self.num_channels}, "
+ f"ds={self.downsample_ratio}x {self.downsample_mode.value}, "
+ f"→ {self.per_channel_rate_hz / 1e6:.2f}MS/s/ch, "
+ f"{self.storage_rate_hz / 1e6:.2f}MS/s storage"
+ f")"
+ )
+
+ def __repr__(self) -> str:
+ """Development-friendly representation."""
+ return (
+ f"AcquisitionRate("
+ f"hardware_rate_hz={self.hardware_rate_hz}, "
+ f"num_channels={self.num_channels}, "
+ f"downsample_ratio={self.downsample_ratio}, "
+ f"downsample_mode={self.downsample_mode})"
+ )