summaryrefslogtreecommitdiff
path: root/picostream/test_rate_contract.py
diff options
context:
space:
mode:
Diffstat (limited to 'picostream/test_rate_contract.py')
-rw-r--r--picostream/test_rate_contract.py263
1 files changed, 263 insertions, 0 deletions
diff --git a/picostream/test_rate_contract.py b/picostream/test_rate_contract.py
new file mode 100644
index 0000000..2e65f39
--- /dev/null
+++ b/picostream/test_rate_contract.py
@@ -0,0 +1,263 @@
+"""Integration tests for rate handling across components.
+
+These tests verify that rate information flows correctly from the device
+through the ring buffer to the data pipeline, ensuring time calculations
+are accurate.
+"""
+
+import numpy as np
+
+from picostream.acquisition_rate import AcquisitionRate, DownsampleMode
+from picostream.data_pipeline import DataPipeline, PipelineConfig
+from picostream.ring_buffer import RingBuffer
+
+
+class TestRateContractDeviceToPipeline:
+ """Tests for rate contract between device, ring buffer, and pipeline.
+
+ These tests catch issues like double-counting channels or misapplying
+ downsampling ratios across component boundaries.
+ """
+
+ def _simulate_device_setting_storage_rate(
+ self,
+ ring_buffer: RingBuffer,
+ hardware_rate_hz: float,
+ downsample_ratio: int,
+ downsample_mode: str,
+ n_channels: int,
+ ) -> AcquisitionRate:
+ """Simulate how the device sets AcquisitionRate in start_streaming().
+
+ This replicates the logic in PicoscopeBufferedStream.start_streaming()
+ to ensure tests reflect actual device behavior.
+
+ Returns
+ -------
+ AcquisitionRate
+ The acquisition rate object that gets set on the ring buffer.
+ """
+ rate = AcquisitionRate(
+ hardware_rate_hz=hardware_rate_hz,
+ num_channels=n_channels,
+ downsample_ratio=downsample_ratio,
+ downsample_mode=DownsampleMode(downsample_mode),
+ )
+ ring_buffer.set_acquisition_rate(rate)
+ return rate
+
+ def test_single_channel_no_downsample_time_calculation(self):
+ """1 channel, no downsampling: 1 second of data = 1 second displayed."""
+ # Setup ring buffer as device would
+ rb = RingBuffer(duration_s=10.0, sample_rate=62.5e6, num_channels=1)
+
+ # Device configures the actual storage rate
+ rate = self._simulate_device_setting_storage_rate(
+ rb,
+ hardware_rate_hz=62.5e6,
+ downsample_ratio=1,
+ downsample_mode="NONE",
+ n_channels=1,
+ )
+
+ # Pipeline config and acquisition rate
+ config = PipelineConfig(
+ resolution=16,
+ voltage_ranges={0: 20.0},
+ offsets_v={0: 0.0},
+ max_adc_value=32767,
+ target_plot_points=20000,
+ )
+ pipeline = DataPipeline(config, rate)
+
+ # Write 1 second of data to ring buffer
+ one_second_samples = int(rate.storage_rate_hz) # 62.5M samples
+ data = np.random.randint(
+ -1000, 1000, size=(one_second_samples, 1), dtype=np.int16
+ )
+ rb.write(data)
+
+ # Read it back and verify time calculation
+ read_data = rb.read_last(one_second_samples)
+ duration = pipeline.samples_to_time(len(read_data))
+
+ # Should be exactly 1 second (within floating point tolerance)
+ assert abs(duration - 1.0) < 0.001, f"Expected ~1.0s, got {duration}s"
+
+ def test_two_channel_no_downsample_time_calculation(self):
+ """2 channels, no downsampling: 1 second of data = 1 second displayed."""
+ rb = RingBuffer(duration_s=10.0, sample_rate=125e6, num_channels=2)
+
+ # Device: 2 channels at 62.5 MS/s each = 125 MS/s total
+ rate = self._simulate_device_setting_storage_rate(
+ rb,
+ hardware_rate_hz=125e6, # 62.5e6 * 2
+ downsample_ratio=1,
+ downsample_mode="NONE",
+ n_channels=2,
+ )
+
+ config = PipelineConfig(
+ resolution=16,
+ voltage_ranges={0: 20.0, 1: 20.0},
+ offsets_v={0: 0.0, 1: 0.0},
+ max_adc_value=32767,
+ target_plot_points=20000,
+ )
+ pipeline = DataPipeline(config, rate)
+
+ # Write 1 second of data
+ one_second_samples = int(rate.storage_rate_hz) # 125M samples total
+ data = np.random.randint(
+ -1000, 1000, size=(one_second_samples, 2), dtype=np.int16
+ )
+ rb.write(data)
+
+ # Verify: per_channel_rate should be 62.5 MS/s
+ assert pipeline.acquisition_rate.per_channel_rate_hz == 62.5e6
+
+ # Time for 62.5M samples on one channel = 1 second
+ duration = pipeline.samples_to_time(62500000)
+ assert abs(duration - 1.0) < 0.001, f"Expected ~1.0s, got {duration}s"
+
+ def test_ten_x_downsample_time_calculation(self):
+ """10x downsampling: 1 second of real time = 1 second displayed."""
+ rb = RingBuffer(duration_s=10.0, sample_rate=12.5e6, num_channels=2)
+
+ # Device: 125 MS/s hardware / 10x downsample = 12.5 MS/s storage
+ rate = self._simulate_device_setting_storage_rate(
+ rb,
+ hardware_rate_hz=125e6,
+ downsample_ratio=10,
+ downsample_mode="DECIMATE",
+ n_channels=2,
+ )
+
+ config = PipelineConfig(
+ resolution=16,
+ voltage_ranges={0: 20.0, 1: 20.0},
+ offsets_v={0: 0.0, 1: 0.0},
+ max_adc_value=32767,
+ target_plot_points=20000,
+ )
+ pipeline = DataPipeline(config, rate)
+
+ # Write 1 second of downsampled data
+ one_second_samples = int(rate.storage_rate_hz) # 12.5M samples
+ data = np.random.randint(
+ -1000, 1000, size=(one_second_samples, 2), dtype=np.int16
+ )
+ rb.write(data)
+
+ # Per-channel rate should be 6.25 MS/s
+ assert pipeline.acquisition_rate.per_channel_rate_hz == 6.25e6
+
+ # Time for 6.25M samples (1 channel) = 1 second
+ duration = pipeline.samples_to_time(6250000)
+ assert abs(duration - 1.0) < 0.001, f"Expected ~1.0s, got {duration}s"
+
+ def test_aggregate_mode_time_calculation(self):
+ """AGGREGATE mode: 1 second of real time = 1 second displayed."""
+ rb = RingBuffer(duration_s=10.0, sample_rate=25e6, num_channels=2)
+
+ # Device: 125 MS/s hardware / 10x * 2 (min/max) = 25 MS/s storage
+ rate = self._simulate_device_setting_storage_rate(
+ rb,
+ hardware_rate_hz=125e6,
+ downsample_ratio=10,
+ downsample_mode="AGGREGATE",
+ n_channels=2,
+ )
+
+ config = PipelineConfig(
+ resolution=16,
+ voltage_ranges={0: 20.0, 1: 20.0},
+ offsets_v={0: 0.0, 1: 0.0},
+ max_adc_value=32767,
+ target_plot_points=20000,
+ )
+ pipeline = DataPipeline(config, rate)
+
+ # Write 1 second of AGGREGATE data
+ one_second_samples = int(
+ rate.storage_rate_hz
+ ) # 25M samples (12.5M min/max pairs)
+ data = np.random.randint(
+ -1000, 1000, size=(one_second_samples, 2), dtype=np.int16
+ )
+ rb.write(data)
+
+ # Storage rate is 25 MS/s (12.5M pairs/sec * 2 values/pair)
+ # Per-channel rate is 6.25 MS/s (125M / 2 channels / 10 downsample)
+ assert rate.storage_rate_hz == 25e6
+ assert pipeline.acquisition_rate.per_channel_rate_hz == 6.25e6
+
+ # In AGGREGATE mode:
+ # - 25e6 samples = 12.5e6 min/max pairs
+ # - samples_to_time: n_time_points = 25e6 // 2 = 12.5e6 pairs
+ # - duration = 12.5e6 pairs / 6.25e6 per_channel_rate = 2 seconds
+ duration = pipeline.samples_to_time(25000000)
+ assert abs(duration - 2.0) < 0.001, f"Expected ~2.0s, got {duration}s"
+
+
+class TestRateContractInvariants:
+ """Invariants that must hold regardless of configuration."""
+
+ def test_storage_rate_is_total_not_per_channel(self):
+ """storage_rate_hz must always represent total rate, not per-channel."""
+ # Single channel
+ rate1 = AcquisitionRate(
+ hardware_rate_hz=10e6,
+ num_channels=1,
+ downsample_ratio=1,
+ downsample_mode=DownsampleMode.NONE,
+ )
+ assert rate1.storage_rate_hz == 10e6
+
+ # Two channels - rate doubles (more samples per unit time)
+ rate2 = AcquisitionRate(
+ hardware_rate_hz=20e6,
+ num_channels=2,
+ downsample_ratio=1,
+ downsample_mode=DownsampleMode.NONE,
+ )
+ assert rate2.storage_rate_hz == 20e6 # 10 MS/s per channel * 2 channels
+
+ def test_pipeline_divides_by_channel_count(self):
+ """Pipeline must divide total rate by channel count for per-channel rate."""
+ # Test 2-channel setup
+ rate2 = AcquisitionRate(
+ hardware_rate_hz=100e6,
+ num_channels=2,
+ downsample_ratio=1,
+ downsample_mode=DownsampleMode.NONE,
+ )
+ config = PipelineConfig(
+ resolution=16,
+ voltage_ranges={0: 20.0, 1: 20.0},
+ offsets_v={0: 0.0, 1: 0.0},
+ max_adc_value=32767,
+ target_plot_points=20000,
+ )
+ pipeline = DataPipeline(config, rate2)
+
+ # Per-channel rate should be 50 MS/s (100 / 2 channels)
+ assert pipeline.acquisition_rate.per_channel_rate_hz == 50e6
+
+ # Test 1-channel setup
+ rate1 = AcquisitionRate(
+ hardware_rate_hz=50e6,
+ num_channels=1,
+ downsample_ratio=1,
+ downsample_mode=DownsampleMode.NONE,
+ )
+ config1 = PipelineConfig(
+ resolution=16,
+ voltage_ranges={0: 20.0},
+ offsets_v={0: 0.0},
+ max_adc_value=32767,
+ target_plot_points=20000,
+ )
+ pipeline1 = DataPipeline(config1, rate1)
+
+ assert pipeline1.acquisition_rate.per_channel_rate_hz == 50e6