diff options
Diffstat (limited to 'picostream/test_rate_contract.py')
| -rw-r--r-- | picostream/test_rate_contract.py | 263 |
1 files changed, 263 insertions, 0 deletions
diff --git a/picostream/test_rate_contract.py b/picostream/test_rate_contract.py new file mode 100644 index 0000000..2e65f39 --- /dev/null +++ b/picostream/test_rate_contract.py @@ -0,0 +1,263 @@ +"""Integration tests for rate handling across components. + +These tests verify that rate information flows correctly from the device +through the ring buffer to the data pipeline, ensuring time calculations +are accurate. +""" + +import numpy as np + +from picostream.acquisition_rate import AcquisitionRate, DownsampleMode +from picostream.data_pipeline import DataPipeline, PipelineConfig +from picostream.ring_buffer import RingBuffer + + +class TestRateContractDeviceToPipeline: + """Tests for rate contract between device, ring buffer, and pipeline. + + These tests catch issues like double-counting channels or misapplying + downsampling ratios across component boundaries. + """ + + def _simulate_device_setting_storage_rate( + self, + ring_buffer: RingBuffer, + hardware_rate_hz: float, + downsample_ratio: int, + downsample_mode: str, + n_channels: int, + ) -> AcquisitionRate: + """Simulate how the device sets AcquisitionRate in start_streaming(). + + This replicates the logic in PicoscopeBufferedStream.start_streaming() + to ensure tests reflect actual device behavior. + + Returns + ------- + AcquisitionRate + The acquisition rate object that gets set on the ring buffer. + """ + rate = AcquisitionRate( + hardware_rate_hz=hardware_rate_hz, + num_channels=n_channels, + downsample_ratio=downsample_ratio, + downsample_mode=DownsampleMode(downsample_mode), + ) + ring_buffer.set_acquisition_rate(rate) + return rate + + def test_single_channel_no_downsample_time_calculation(self): + """1 channel, no downsampling: 1 second of data = 1 second displayed.""" + # Setup ring buffer as device would + rb = RingBuffer(duration_s=10.0, sample_rate=62.5e6, num_channels=1) + + # Device configures the actual storage rate + rate = self._simulate_device_setting_storage_rate( + rb, + hardware_rate_hz=62.5e6, + downsample_ratio=1, + downsample_mode="NONE", + n_channels=1, + ) + + # Pipeline config and acquisition rate + config = PipelineConfig( + resolution=16, + voltage_ranges={0: 20.0}, + offsets_v={0: 0.0}, + max_adc_value=32767, + target_plot_points=20000, + ) + pipeline = DataPipeline(config, rate) + + # Write 1 second of data to ring buffer + one_second_samples = int(rate.storage_rate_hz) # 62.5M samples + data = np.random.randint( + -1000, 1000, size=(one_second_samples, 1), dtype=np.int16 + ) + rb.write(data) + + # Read it back and verify time calculation + read_data = rb.read_last(one_second_samples) + duration = pipeline.samples_to_time(len(read_data)) + + # Should be exactly 1 second (within floating point tolerance) + assert abs(duration - 1.0) < 0.001, f"Expected ~1.0s, got {duration}s" + + def test_two_channel_no_downsample_time_calculation(self): + """2 channels, no downsampling: 1 second of data = 1 second displayed.""" + rb = RingBuffer(duration_s=10.0, sample_rate=125e6, num_channels=2) + + # Device: 2 channels at 62.5 MS/s each = 125 MS/s total + rate = self._simulate_device_setting_storage_rate( + rb, + hardware_rate_hz=125e6, # 62.5e6 * 2 + downsample_ratio=1, + downsample_mode="NONE", + n_channels=2, + ) + + config = PipelineConfig( + resolution=16, + voltage_ranges={0: 20.0, 1: 20.0}, + offsets_v={0: 0.0, 1: 0.0}, + max_adc_value=32767, + target_plot_points=20000, + ) + pipeline = DataPipeline(config, rate) + + # Write 1 second of data + one_second_samples = int(rate.storage_rate_hz) # 125M samples total + data = np.random.randint( + -1000, 1000, size=(one_second_samples, 2), dtype=np.int16 + ) + rb.write(data) + + # Verify: per_channel_rate should be 62.5 MS/s + assert pipeline.acquisition_rate.per_channel_rate_hz == 62.5e6 + + # Time for 62.5M samples on one channel = 1 second + duration = pipeline.samples_to_time(62500000) + assert abs(duration - 1.0) < 0.001, f"Expected ~1.0s, got {duration}s" + + def test_ten_x_downsample_time_calculation(self): + """10x downsampling: 1 second of real time = 1 second displayed.""" + rb = RingBuffer(duration_s=10.0, sample_rate=12.5e6, num_channels=2) + + # Device: 125 MS/s hardware / 10x downsample = 12.5 MS/s storage + rate = self._simulate_device_setting_storage_rate( + rb, + hardware_rate_hz=125e6, + downsample_ratio=10, + downsample_mode="DECIMATE", + n_channels=2, + ) + + config = PipelineConfig( + resolution=16, + voltage_ranges={0: 20.0, 1: 20.0}, + offsets_v={0: 0.0, 1: 0.0}, + max_adc_value=32767, + target_plot_points=20000, + ) + pipeline = DataPipeline(config, rate) + + # Write 1 second of downsampled data + one_second_samples = int(rate.storage_rate_hz) # 12.5M samples + data = np.random.randint( + -1000, 1000, size=(one_second_samples, 2), dtype=np.int16 + ) + rb.write(data) + + # Per-channel rate should be 6.25 MS/s + assert pipeline.acquisition_rate.per_channel_rate_hz == 6.25e6 + + # Time for 6.25M samples (1 channel) = 1 second + duration = pipeline.samples_to_time(6250000) + assert abs(duration - 1.0) < 0.001, f"Expected ~1.0s, got {duration}s" + + def test_aggregate_mode_time_calculation(self): + """AGGREGATE mode: 1 second of real time = 1 second displayed.""" + rb = RingBuffer(duration_s=10.0, sample_rate=25e6, num_channels=2) + + # Device: 125 MS/s hardware / 10x * 2 (min/max) = 25 MS/s storage + rate = self._simulate_device_setting_storage_rate( + rb, + hardware_rate_hz=125e6, + downsample_ratio=10, + downsample_mode="AGGREGATE", + n_channels=2, + ) + + config = PipelineConfig( + resolution=16, + voltage_ranges={0: 20.0, 1: 20.0}, + offsets_v={0: 0.0, 1: 0.0}, + max_adc_value=32767, + target_plot_points=20000, + ) + pipeline = DataPipeline(config, rate) + + # Write 1 second of AGGREGATE data + one_second_samples = int( + rate.storage_rate_hz + ) # 25M samples (12.5M min/max pairs) + data = np.random.randint( + -1000, 1000, size=(one_second_samples, 2), dtype=np.int16 + ) + rb.write(data) + + # Storage rate is 25 MS/s (12.5M pairs/sec * 2 values/pair) + # Per-channel rate is 6.25 MS/s (125M / 2 channels / 10 downsample) + assert rate.storage_rate_hz == 25e6 + assert pipeline.acquisition_rate.per_channel_rate_hz == 6.25e6 + + # In AGGREGATE mode: + # - 25e6 samples = 12.5e6 min/max pairs + # - samples_to_time: n_time_points = 25e6 // 2 = 12.5e6 pairs + # - duration = 12.5e6 pairs / 6.25e6 per_channel_rate = 2 seconds + duration = pipeline.samples_to_time(25000000) + assert abs(duration - 2.0) < 0.001, f"Expected ~2.0s, got {duration}s" + + +class TestRateContractInvariants: + """Invariants that must hold regardless of configuration.""" + + def test_storage_rate_is_total_not_per_channel(self): + """storage_rate_hz must always represent total rate, not per-channel.""" + # Single channel + rate1 = AcquisitionRate( + hardware_rate_hz=10e6, + num_channels=1, + downsample_ratio=1, + downsample_mode=DownsampleMode.NONE, + ) + assert rate1.storage_rate_hz == 10e6 + + # Two channels - rate doubles (more samples per unit time) + rate2 = AcquisitionRate( + hardware_rate_hz=20e6, + num_channels=2, + downsample_ratio=1, + downsample_mode=DownsampleMode.NONE, + ) + assert rate2.storage_rate_hz == 20e6 # 10 MS/s per channel * 2 channels + + def test_pipeline_divides_by_channel_count(self): + """Pipeline must divide total rate by channel count for per-channel rate.""" + # Test 2-channel setup + rate2 = AcquisitionRate( + hardware_rate_hz=100e6, + num_channels=2, + downsample_ratio=1, + downsample_mode=DownsampleMode.NONE, + ) + config = PipelineConfig( + resolution=16, + voltage_ranges={0: 20.0, 1: 20.0}, + offsets_v={0: 0.0, 1: 0.0}, + max_adc_value=32767, + target_plot_points=20000, + ) + pipeline = DataPipeline(config, rate2) + + # Per-channel rate should be 50 MS/s (100 / 2 channels) + assert pipeline.acquisition_rate.per_channel_rate_hz == 50e6 + + # Test 1-channel setup + rate1 = AcquisitionRate( + hardware_rate_hz=50e6, + num_channels=1, + downsample_ratio=1, + downsample_mode=DownsampleMode.NONE, + ) + config1 = PipelineConfig( + resolution=16, + voltage_ranges={0: 20.0}, + offsets_v={0: 0.0}, + max_adc_value=32767, + target_plot_points=20000, + ) + pipeline1 = DataPipeline(config1, rate1) + + assert pipeline1.acquisition_rate.per_channel_rate_hz == 50e6 |
