diff options
| author | Sam Scholten | 2025-10-24 15:05:53 +1000 |
|---|---|---|
| committer | Sam Scholten | 2025-10-24 15:05:53 +1000 |
| commit | bb9a461f9933401599c05f87328f835f65faaee0 (patch) | |
| tree | 262ca4ca327bc8749bc4194f8892b4f9a3afcb7d | |
| parent | 0925d0645256cdb0c95b1aa85f5b65d3ebcd84a3 (diff) | |
| download | transivent-bb9a461f9933401599c05f87328f835f65faaee0.tar.gz transivent-bb9a461f9933401599c05f87328f835f65faaee0.zip | |
Add resample subpackage with downsampling support
- Add transivent.resample subpackage for signal preprocessing
- Implement average_downsample() for block averaging downsampling
- Add downsample_to_interval() for interval-based downsampling
- Add target_sampling_interval parameter to detect_from_wfm()
- Preserves signal amplitude and improves SNR through averaging
- Creates uniform time arrays to avoid validation warnings
| -rw-r--r-- | src/transivent/__init__.py | 3 | ||||
| -rw-r--r-- | src/transivent/analysis.py | 49 | ||||
| -rw-r--r-- | src/transivent/resample/__init__.py | 20 | ||||
| -rw-r--r-- | src/transivent/resample/average.py | 123 | ||||
| -rw-r--r-- | src/transivent/resample/utils.py | 65 | ||||
| -rw-r--r-- | tests/test_resample.py | 98 | ||||
| -rw-r--r-- | uv.lock | 2 |
7 files changed, 359 insertions, 1 deletions
diff --git a/src/transivent/__init__.py b/src/transivent/__init__.py index 9cec6f6..b802f30 100644 --- a/src/transivent/__init__.py +++ b/src/transivent/__init__.py @@ -18,6 +18,7 @@ from .event_detector import detect_events, merge_overlapping_events from .event_plotter import EventPlotter from .event_processor import extract_event_waveforms from .io import get_waveform_params, rd +from . import resample __all__ = [ # Main entry points @@ -34,4 +35,6 @@ __all__ = [ # I/O utilities "get_waveform_params", "rd", + # Signal processing + "resample", ] diff --git a/src/transivent/analysis.py b/src/transivent/analysis.py index caed827..6ef34d4 100644 --- a/src/transivent/analysis.py +++ b/src/transivent/analysis.py @@ -98,6 +98,40 @@ def plot_preview_image(image_path: str, title: str = "Preview Image") -> None: logger.warning(f"Failed to display preview image {image_path}: {e}") +def _average_downsample( + t: np.ndarray, x: np.ndarray, q: int +) -> Tuple[np.ndarray, np.ndarray]: + """ + Private helper to downsample by averaging blocks of samples. + + Parameters + ---------- + t : np.ndarray + Time array. + x : np.ndarray + Signal array. + q : int + Downsample factor (must be integer > 1). + + Returns + ------- + Tuple[np.ndarray, np.ndarray] + Downsampled time and signal arrays. + """ + if q <= 1: + return t, x + + n_keep = (len(x) // q) * q + x_reshaped = x[:n_keep].reshape(-1, q) + x_avg = x_reshaped.mean(axis=1) + + # Generate uniform time array + dt = t[1] - t[0] if len(t) > 1 else 1.0 + t_down = np.arange(len(x_avg)) * dt * q + t[0] + + return t_down, x_avg + + def configure_logging(log_level: str = "INFO") -> None: """ Configure loguru logging with specified level. @@ -1398,6 +1432,7 @@ def detect_from_wfm( save_plots: bool = True, plot_dir: Optional[str] = None, chunk_size: Optional[int] = None, + target_sampling_interval: Optional[float] = None, ) -> Dict[str, Any]: """ Detect transient events in a Wfm binary file with XML sidecar. @@ -1444,6 +1479,11 @@ def detect_from_wfm( Directory to save plots. If None, uses data_path_analysis/. chunk_size : Optional[int], default=None Chunk size for processing large files. If None, loads entire file. + target_sampling_interval : Optional[float], default=None + Target sampling interval in seconds for downsampling. If provided, + the data will be downsampled by block averaging to this interval + before event detection. This improves SNR and reduces processing time + for oversampled data. Returns ------- @@ -1500,6 +1540,15 @@ def detect_from_wfm( # Load data logger.info("Loading Wfm file...") t, x = load_data(name, sampling_interval, data_path, sidecar, crop) + + # Apply resampling if requested + if target_sampling_interval is not None: + from .resample import downsample_to_interval + current_interval = t[1] - t[0] if len(t) > 1 else sampling_interval + if target_sampling_interval != current_interval: + logger.info(f"Resampling from {current_interval:.3e}s to {target_sampling_interval:.3e}s") + t, x = average_downsample_to_interval(t, x, target_sampling_interval) + sampling_interval = target_sampling_interval # Run analysis pipeline logger.info("Running analysis pipeline...") diff --git a/src/transivent/resample/__init__.py b/src/transivent/resample/__init__.py new file mode 100644 index 0000000..b259be4 --- /dev/null +++ b/src/transivent/resample/__init__.py @@ -0,0 +1,20 @@ +""" +Signal resampling utilities for transient event detection. + +This module provides functions for downsampling high-frequency time series data +while preserving signal integrity and improving signal-to-noise ratio. + +Typical use cases: +- Reduce processing time for oversampled data +- Match sampling rates across multiple datasets +- Improve SNR through averaging +""" + +from .average import average_downsample, downsample_to_interval +from .utils import calculate_downsample_factor + +__all__ = [ + "average_downsample", + "downsample_to_interval", + "calculate_downsample_factor", +]
\ No newline at end of file diff --git a/src/transivent/resample/average.py b/src/transivent/resample/average.py new file mode 100644 index 0000000..cb4f3b0 --- /dev/null +++ b/src/transivent/resample/average.py @@ -0,0 +1,123 @@ +""" +Block averaging downsampling for time series data. +""" + +import numpy as np +from typing import Tuple + +from .utils import validate_inputs + + +def average_downsample( + t: np.ndarray, x: np.ndarray, q: int +) -> Tuple[np.ndarray, np.ndarray]: + """ + Downsample by averaging blocks of samples. + + This method blocks the signal into groups of q samples and averages each group. + The time array is replaced with a uniform array at the new sampling rate. + + Parameters + ---------- + t : np.ndarray + Time array in seconds. + x : np.ndarray + Signal array. + q : int + Downsample factor (must be integer >= 1). + + Returns + ------- + Tuple[np.ndarray, np.ndarray] + Downsampled time and signal arrays. + + Raises + ------ + ValueError + If q < 1 or if input arrays are invalid. + + Notes + ----- + - Improves signal-to-noise ratio by sqrt(q) for white noise + - Preserves signal amplitude (mean is preserved) + - Creates uniform time array (no validation warnings) + - Last incomplete block is discarded + + Examples + -------- + >>> import numpy as np + >>> t = np.linspace(0, 1, 10000) + >>> x = np.random.randn(10000) + >>> t_down, x_down = average_downsample(t, x, q=10) + >>> print(len(t_down), len(x_down)) + 1000 1000 + """ + validate_inputs(t, x) + + if q < 1: + raise ValueError(f"Downsample factor q must be >= 1, got {q}") + + if q == 1: + # No downsampling needed + return t.copy(), x.copy() + + # Calculate how many complete blocks we have + n_keep = (len(x) // q) * q + + if n_keep == 0: + raise ValueError( + f"Input length ({len(x)}) is less than downsample factor ({q}). " + "No complete blocks available for averaging." + ) + + # Reshape and average + x_reshaped = x[:n_keep].reshape(-1, q) + x_avg = x_reshaped.mean(axis=1) + + # Generate uniform time array + dt = t[1] - t[0] if len(t) > 1 else 1.0 + t_down = np.arange(len(x_avg)) * dt * q + t[0] + + return t_down, x_avg + + +def downsample_to_interval( + t: np.ndarray, x: np.ndarray, target_interval: float +) -> Tuple[np.ndarray, np.ndarray]: + """ + Downsample by averaging to achieve a target sampling interval. + + This is a convenience wrapper that calculates the downsample factor + from the current and target sampling intervals. + + Parameters + ---------- + t : np.ndarray + Time array in seconds. + x : np.ndarray + Signal array. + target_interval : float + Target sampling interval in seconds. + + Returns + ------- + Tuple[np.ndarray, np.ndarray] + Downsampled time and signal arrays. + + Examples + -------- + >>> # Downsample 20 MHz data to 2 MHz + >>> t = np.arange(100000) * 5e-8 # 20 MHz + >>> x = np.random.randn(100000) + >>> t_down, x_down = downsample_to_interval(t, x, 5e-7) # 2 MHz + >>> print(f"Original dt: {t[1]-t[0]:.2e}, Downsampled dt: {t_down[1]-t_down[0]:.2e}") + Original dt: 5.00e-08, Downsampled dt: 5.00e-07 + """ + validate_inputs(t, x) + + current_interval = t[1] - t[0] if len(t) > 1 else 1.0 + + from .utils import calculate_downsample_factor + q = calculate_downsample_factor(current_interval, target_interval) + + return average_downsample(t, x, q)
\ No newline at end of file diff --git a/src/transivent/resample/utils.py b/src/transivent/resample/utils.py new file mode 100644 index 0000000..2d49c14 --- /dev/null +++ b/src/transivent/resample/utils.py @@ -0,0 +1,65 @@ +""" +Utility functions for signal resampling. +""" + +import numpy as np +from typing import Tuple + + +def calculate_downsample_factor( + current_interval: float, target_interval: float +) -> int: + """ + Calculate the integer downsample factor needed to reach target sampling interval. + + Parameters + ---------- + current_interval : float + Current sampling interval in seconds. + target_interval : float + Target sampling interval in seconds. + + Returns + ------- + int + Integer downsample factor (must be >= 1). + + Raises + ------ + ValueError + If target_interval < current_interval (upsampling not supported). + """ + if target_interval < current_interval: + raise ValueError( + f"Target interval ({target_interval:e}s) < current interval " + f"({current_interval:e}s). Upsampling not supported." + ) + + factor = int(round(target_interval / current_interval)) + return max(1, factor) + + +def validate_inputs(t: np.ndarray, x: np.ndarray) -> None: + """ + Validate time and signal arrays. + + Parameters + ---------- + t : np.ndarray + Time array. + x : np.ndarray + Signal array. + + Raises + ------ + ValueError + If arrays have different lengths or are empty. + """ + if len(t) != len(x): + raise ValueError(f"Time array length ({len(t)}) != signal array length ({len(x)})") + + if len(t) == 0: + raise ValueError("Input arrays cannot be empty") + + if len(t) < 2: + raise ValueError("Input arrays must have at least 2 points")
\ No newline at end of file diff --git a/tests/test_resample.py b/tests/test_resample.py new file mode 100644 index 0000000..6b85878 --- /dev/null +++ b/tests/test_resample.py @@ -0,0 +1,98 @@ +"""Test the resample module.""" + +import numpy as np +import pytest + +from transivent.resample import average_downsample, downsample_to_interval + + +def test_average_downsample_basic(): + """Test basic downsampling functionality.""" + # Create test data + t = np.linspace(0, 1, 1000) + x = np.ones(1000) + + # Downsample by factor of 10 + t_down, x_down = average_downsample(t, x, q=10) + + assert len(t_down) == 100 + assert len(x_down) == 100 + assert np.allclose(x_down, 1.0) # Mean should be preserved + assert np.allclose(t_down[1] - t_down[0], t[10] - t[0]) # Check time step + + +def test_average_downsample_q1(): + """Test that q=1 returns unchanged data.""" + t = np.linspace(0, 1, 100) + x = np.random.randn(100) + + t_out, x_out = average_downsample(t, x, q=1) + + np.testing.assert_array_equal(t_out, t) + np.testing.assert_array_equal(x_out, x) + + +def test_average_downsample_invalid_q(): + """Test error handling for invalid downsample factor.""" + t = np.linspace(0, 1, 100) + x = np.random.randn(100) + + with pytest.raises(ValueError, match="q must be >= 1"): + average_downsample(t, x, q=0) + + with pytest.raises(ValueError, match="q must be >= 1"): + average_downsample(t, x, q=-1) + + +def test_average_downsample_short_array(): + """Test downsampling when array is shorter than factor.""" + t = np.linspace(0, 1, 5) + x = np.ones(5) + + with pytest.raises(ValueError, match="Input length .* is less than downsample factor"): + average_downsample(t, x, q=10) + + +def test_downsample_to_interval(): + """Test interval-based downsampling.""" + # 10 kHz data (exact interval) + t = np.arange(10000) * 1e-4 + x = np.random.randn(10000) + + # Downsample to 1 kHz + t_down, x_down = downsample_to_interval(t, x, target_interval=1e-3) + + # Should downsample by factor of 10 + assert len(t_down) == 1000 + assert len(x_down) == 1000 + # Original dt was 1e-4, new dt should be 1e-3 + assert np.allclose(t_down[1] - t_down[0], 1e-3) + + +def test_downsample_to_interval_upsampling(): + """Test error for upsampling (not supported).""" + t = np.linspace(0, 1, 1000) # dt = 0.001 + x = np.random.randn(1000) + + with pytest.raises(ValueError, match="Upsampling not supported"): + downsample_to_interval(t, x, target_interval=5e-4) # Smaller interval + + +def test_average_downsample_preserves_amplitude(): + """Test that amplitude is preserved through averaging.""" + t = np.linspace(0, 1, 1000) + # Create a step function + x = np.concatenate([np.ones(500), 2 * np.ones(500)]) + + # Downsample by factor of 10 + t_down, x_down = average_downsample(t, x, q=10) + + # Check the transition is at the right place + transition_idx = 50 # 500 / 10 + # First part should be ~1, second part should be ~2 + assert np.allclose(x_down[:transition_idx], 1.0) + assert np.allclose(x_down[transition_idx:], 2.0) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])
\ No newline at end of file @@ -2335,7 +2335,7 @@ wheels = [ [[package]] name = "transivent" -version = "2.0.0" +version = "2.0.3" source = { editable = "." } dependencies = [ { name = "joblib", version = "1.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, |
