aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Scholten2025-10-24 15:05:53 +1000
committerSam Scholten2025-10-24 15:05:53 +1000
commitbb9a461f9933401599c05f87328f835f65faaee0 (patch)
tree262ca4ca327bc8749bc4194f8892b4f9a3afcb7d
parent0925d0645256cdb0c95b1aa85f5b65d3ebcd84a3 (diff)
downloadtransivent-bb9a461f9933401599c05f87328f835f65faaee0.tar.gz
transivent-bb9a461f9933401599c05f87328f835f65faaee0.zip
Add resample subpackage with downsampling support
- Add transivent.resample subpackage for signal preprocessing - Implement average_downsample() for block averaging downsampling - Add downsample_to_interval() for interval-based downsampling - Add target_sampling_interval parameter to detect_from_wfm() - Preserves signal amplitude and improves SNR through averaging - Creates uniform time arrays to avoid validation warnings
-rw-r--r--src/transivent/__init__.py3
-rw-r--r--src/transivent/analysis.py49
-rw-r--r--src/transivent/resample/__init__.py20
-rw-r--r--src/transivent/resample/average.py123
-rw-r--r--src/transivent/resample/utils.py65
-rw-r--r--tests/test_resample.py98
-rw-r--r--uv.lock2
7 files changed, 359 insertions, 1 deletions
diff --git a/src/transivent/__init__.py b/src/transivent/__init__.py
index 9cec6f6..b802f30 100644
--- a/src/transivent/__init__.py
+++ b/src/transivent/__init__.py
@@ -18,6 +18,7 @@ from .event_detector import detect_events, merge_overlapping_events
from .event_plotter import EventPlotter
from .event_processor import extract_event_waveforms
from .io import get_waveform_params, rd
+from . import resample
__all__ = [
# Main entry points
@@ -34,4 +35,6 @@ __all__ = [
# I/O utilities
"get_waveform_params",
"rd",
+ # Signal processing
+ "resample",
]
diff --git a/src/transivent/analysis.py b/src/transivent/analysis.py
index caed827..6ef34d4 100644
--- a/src/transivent/analysis.py
+++ b/src/transivent/analysis.py
@@ -98,6 +98,40 @@ def plot_preview_image(image_path: str, title: str = "Preview Image") -> None:
logger.warning(f"Failed to display preview image {image_path}: {e}")
+def _average_downsample(
+ t: np.ndarray, x: np.ndarray, q: int
+) -> Tuple[np.ndarray, np.ndarray]:
+ """
+ Private helper to downsample by averaging blocks of samples.
+
+ Parameters
+ ----------
+ t : np.ndarray
+ Time array.
+ x : np.ndarray
+ Signal array.
+ q : int
+ Downsample factor (must be integer > 1).
+
+ Returns
+ -------
+ Tuple[np.ndarray, np.ndarray]
+ Downsampled time and signal arrays.
+ """
+ if q <= 1:
+ return t, x
+
+ n_keep = (len(x) // q) * q
+ x_reshaped = x[:n_keep].reshape(-1, q)
+ x_avg = x_reshaped.mean(axis=1)
+
+ # Generate uniform time array
+ dt = t[1] - t[0] if len(t) > 1 else 1.0
+ t_down = np.arange(len(x_avg)) * dt * q + t[0]
+
+ return t_down, x_avg
+
+
def configure_logging(log_level: str = "INFO") -> None:
"""
Configure loguru logging with specified level.
@@ -1398,6 +1432,7 @@ def detect_from_wfm(
save_plots: bool = True,
plot_dir: Optional[str] = None,
chunk_size: Optional[int] = None,
+ target_sampling_interval: Optional[float] = None,
) -> Dict[str, Any]:
"""
Detect transient events in a Wfm binary file with XML sidecar.
@@ -1444,6 +1479,11 @@ def detect_from_wfm(
Directory to save plots. If None, uses data_path_analysis/.
chunk_size : Optional[int], default=None
Chunk size for processing large files. If None, loads entire file.
+ target_sampling_interval : Optional[float], default=None
+ Target sampling interval in seconds for downsampling. If provided,
+ the data will be downsampled by block averaging to this interval
+ before event detection. This improves SNR and reduces processing time
+ for oversampled data.
Returns
-------
@@ -1500,6 +1540,15 @@ def detect_from_wfm(
# Load data
logger.info("Loading Wfm file...")
t, x = load_data(name, sampling_interval, data_path, sidecar, crop)
+
+ # Apply resampling if requested
+ if target_sampling_interval is not None:
+ from .resample import downsample_to_interval
+ current_interval = t[1] - t[0] if len(t) > 1 else sampling_interval
+ if target_sampling_interval != current_interval:
+ logger.info(f"Resampling from {current_interval:.3e}s to {target_sampling_interval:.3e}s")
+ t, x = average_downsample_to_interval(t, x, target_sampling_interval)
+ sampling_interval = target_sampling_interval
# Run analysis pipeline
logger.info("Running analysis pipeline...")
diff --git a/src/transivent/resample/__init__.py b/src/transivent/resample/__init__.py
new file mode 100644
index 0000000..b259be4
--- /dev/null
+++ b/src/transivent/resample/__init__.py
@@ -0,0 +1,20 @@
+"""
+Signal resampling utilities for transient event detection.
+
+This module provides functions for downsampling high-frequency time series data
+while preserving signal integrity and improving signal-to-noise ratio.
+
+Typical use cases:
+- Reduce processing time for oversampled data
+- Match sampling rates across multiple datasets
+- Improve SNR through averaging
+"""
+
+from .average import average_downsample, downsample_to_interval
+from .utils import calculate_downsample_factor
+
+__all__ = [
+ "average_downsample",
+ "downsample_to_interval",
+ "calculate_downsample_factor",
+] \ No newline at end of file
diff --git a/src/transivent/resample/average.py b/src/transivent/resample/average.py
new file mode 100644
index 0000000..cb4f3b0
--- /dev/null
+++ b/src/transivent/resample/average.py
@@ -0,0 +1,123 @@
+"""
+Block averaging downsampling for time series data.
+"""
+
+import numpy as np
+from typing import Tuple
+
+from .utils import validate_inputs
+
+
+def average_downsample(
+ t: np.ndarray, x: np.ndarray, q: int
+) -> Tuple[np.ndarray, np.ndarray]:
+ """
+ Downsample by averaging blocks of samples.
+
+ This method blocks the signal into groups of q samples and averages each group.
+ The time array is replaced with a uniform array at the new sampling rate.
+
+ Parameters
+ ----------
+ t : np.ndarray
+ Time array in seconds.
+ x : np.ndarray
+ Signal array.
+ q : int
+ Downsample factor (must be integer >= 1).
+
+ Returns
+ -------
+ Tuple[np.ndarray, np.ndarray]
+ Downsampled time and signal arrays.
+
+ Raises
+ ------
+ ValueError
+ If q < 1 or if input arrays are invalid.
+
+ Notes
+ -----
+ - Improves signal-to-noise ratio by sqrt(q) for white noise
+ - Preserves signal amplitude (mean is preserved)
+ - Creates uniform time array (no validation warnings)
+ - Last incomplete block is discarded
+
+ Examples
+ --------
+ >>> import numpy as np
+ >>> t = np.linspace(0, 1, 10000)
+ >>> x = np.random.randn(10000)
+ >>> t_down, x_down = average_downsample(t, x, q=10)
+ >>> print(len(t_down), len(x_down))
+ 1000 1000
+ """
+ validate_inputs(t, x)
+
+ if q < 1:
+ raise ValueError(f"Downsample factor q must be >= 1, got {q}")
+
+ if q == 1:
+ # No downsampling needed
+ return t.copy(), x.copy()
+
+ # Calculate how many complete blocks we have
+ n_keep = (len(x) // q) * q
+
+ if n_keep == 0:
+ raise ValueError(
+ f"Input length ({len(x)}) is less than downsample factor ({q}). "
+ "No complete blocks available for averaging."
+ )
+
+ # Reshape and average
+ x_reshaped = x[:n_keep].reshape(-1, q)
+ x_avg = x_reshaped.mean(axis=1)
+
+ # Generate uniform time array
+ dt = t[1] - t[0] if len(t) > 1 else 1.0
+ t_down = np.arange(len(x_avg)) * dt * q + t[0]
+
+ return t_down, x_avg
+
+
+def downsample_to_interval(
+ t: np.ndarray, x: np.ndarray, target_interval: float
+) -> Tuple[np.ndarray, np.ndarray]:
+ """
+ Downsample by averaging to achieve a target sampling interval.
+
+ This is a convenience wrapper that calculates the downsample factor
+ from the current and target sampling intervals.
+
+ Parameters
+ ----------
+ t : np.ndarray
+ Time array in seconds.
+ x : np.ndarray
+ Signal array.
+ target_interval : float
+ Target sampling interval in seconds.
+
+ Returns
+ -------
+ Tuple[np.ndarray, np.ndarray]
+ Downsampled time and signal arrays.
+
+ Examples
+ --------
+ >>> # Downsample 20 MHz data to 2 MHz
+ >>> t = np.arange(100000) * 5e-8 # 20 MHz
+ >>> x = np.random.randn(100000)
+ >>> t_down, x_down = downsample_to_interval(t, x, 5e-7) # 2 MHz
+ >>> print(f"Original dt: {t[1]-t[0]:.2e}, Downsampled dt: {t_down[1]-t_down[0]:.2e}")
+ Original dt: 5.00e-08, Downsampled dt: 5.00e-07
+ """
+ validate_inputs(t, x)
+
+ current_interval = t[1] - t[0] if len(t) > 1 else 1.0
+
+ from .utils import calculate_downsample_factor
+ q = calculate_downsample_factor(current_interval, target_interval)
+
+ return average_downsample(t, x, q) \ No newline at end of file
diff --git a/src/transivent/resample/utils.py b/src/transivent/resample/utils.py
new file mode 100644
index 0000000..2d49c14
--- /dev/null
+++ b/src/transivent/resample/utils.py
@@ -0,0 +1,65 @@
+"""
+Utility functions for signal resampling.
+"""
+
+import numpy as np
+from typing import Tuple
+
+
+def calculate_downsample_factor(
+ current_interval: float, target_interval: float
+) -> int:
+ """
+ Calculate the integer downsample factor needed to reach target sampling interval.
+
+ Parameters
+ ----------
+ current_interval : float
+ Current sampling interval in seconds.
+ target_interval : float
+ Target sampling interval in seconds.
+
+ Returns
+ -------
+ int
+ Integer downsample factor (must be >= 1).
+
+ Raises
+ ------
+ ValueError
+ If target_interval < current_interval (upsampling not supported).
+ """
+ if target_interval < current_interval:
+ raise ValueError(
+ f"Target interval ({target_interval:e}s) < current interval "
+ f"({current_interval:e}s). Upsampling not supported."
+ )
+
+ factor = int(round(target_interval / current_interval))
+ return max(1, factor)
+
+
+def validate_inputs(t: np.ndarray, x: np.ndarray) -> None:
+ """
+ Validate time and signal arrays.
+
+ Parameters
+ ----------
+ t : np.ndarray
+ Time array.
+ x : np.ndarray
+ Signal array.
+
+ Raises
+ ------
+ ValueError
+ If arrays have different lengths or are empty.
+ """
+ if len(t) != len(x):
+ raise ValueError(f"Time array length ({len(t)}) != signal array length ({len(x)})")
+
+ if len(t) == 0:
+ raise ValueError("Input arrays cannot be empty")
+
+ if len(t) < 2:
+ raise ValueError("Input arrays must have at least 2 points") \ No newline at end of file
diff --git a/tests/test_resample.py b/tests/test_resample.py
new file mode 100644
index 0000000..6b85878
--- /dev/null
+++ b/tests/test_resample.py
@@ -0,0 +1,98 @@
+"""Test the resample module."""
+
+import numpy as np
+import pytest
+
+from transivent.resample import average_downsample, downsample_to_interval
+
+
+def test_average_downsample_basic():
+ """Test basic downsampling functionality."""
+ # Create test data
+ t = np.linspace(0, 1, 1000)
+ x = np.ones(1000)
+
+ # Downsample by factor of 10
+ t_down, x_down = average_downsample(t, x, q=10)
+
+ assert len(t_down) == 100
+ assert len(x_down) == 100
+ assert np.allclose(x_down, 1.0) # Mean should be preserved
+ assert np.allclose(t_down[1] - t_down[0], t[10] - t[0]) # Check time step
+
+
+def test_average_downsample_q1():
+ """Test that q=1 returns unchanged data."""
+ t = np.linspace(0, 1, 100)
+ x = np.random.randn(100)
+
+ t_out, x_out = average_downsample(t, x, q=1)
+
+ np.testing.assert_array_equal(t_out, t)
+ np.testing.assert_array_equal(x_out, x)
+
+
+def test_average_downsample_invalid_q():
+ """Test error handling for invalid downsample factor."""
+ t = np.linspace(0, 1, 100)
+ x = np.random.randn(100)
+
+ with pytest.raises(ValueError, match="q must be >= 1"):
+ average_downsample(t, x, q=0)
+
+ with pytest.raises(ValueError, match="q must be >= 1"):
+ average_downsample(t, x, q=-1)
+
+
+def test_average_downsample_short_array():
+ """Test downsampling when array is shorter than factor."""
+ t = np.linspace(0, 1, 5)
+ x = np.ones(5)
+
+ with pytest.raises(ValueError, match="Input length .* is less than downsample factor"):
+ average_downsample(t, x, q=10)
+
+
+def test_downsample_to_interval():
+ """Test interval-based downsampling."""
+ # 10 kHz data (exact interval)
+ t = np.arange(10000) * 1e-4
+ x = np.random.randn(10000)
+
+ # Downsample to 1 kHz
+ t_down, x_down = downsample_to_interval(t, x, target_interval=1e-3)
+
+ # Should downsample by factor of 10
+ assert len(t_down) == 1000
+ assert len(x_down) == 1000
+ # Original dt was 1e-4, new dt should be 1e-3
+ assert np.allclose(t_down[1] - t_down[0], 1e-3)
+
+
+def test_downsample_to_interval_upsampling():
+ """Test error for upsampling (not supported)."""
+ t = np.linspace(0, 1, 1000) # dt = 0.001
+ x = np.random.randn(1000)
+
+ with pytest.raises(ValueError, match="Upsampling not supported"):
+ downsample_to_interval(t, x, target_interval=5e-4) # Smaller interval
+
+
+def test_average_downsample_preserves_amplitude():
+ """Test that amplitude is preserved through averaging."""
+ t = np.linspace(0, 1, 1000)
+ # Create a step function
+ x = np.concatenate([np.ones(500), 2 * np.ones(500)])
+
+ # Downsample by factor of 10
+ t_down, x_down = average_downsample(t, x, q=10)
+
+ # Check the transition is at the right place
+ transition_idx = 50 # 500 / 10
+ # First part should be ~1, second part should be ~2
+ assert np.allclose(x_down[:transition_idx], 1.0)
+ assert np.allclose(x_down[transition_idx:], 2.0)
+
+
+if __name__ == "__main__":
+ pytest.main([__file__, "-v"]) \ No newline at end of file
diff --git a/uv.lock b/uv.lock
index f0e8195..817106a 100644
--- a/uv.lock
+++ b/uv.lock
@@ -2335,7 +2335,7 @@ wheels = [
[[package]]
name = "transivent"
-version = "2.0.0"
+version = "2.0.3"
source = { editable = "." }
dependencies = [
{ name = "joblib", version = "1.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" },