aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSam Scholten2025-10-24 15:05:53 +1000
committerSam Scholten2025-10-24 15:05:53 +1000
commitbb9a461f9933401599c05f87328f835f65faaee0 (patch)
tree262ca4ca327bc8749bc4194f8892b4f9a3afcb7d /src
parent0925d0645256cdb0c95b1aa85f5b65d3ebcd84a3 (diff)
downloadtransivent-bb9a461f9933401599c05f87328f835f65faaee0.tar.gz
transivent-bb9a461f9933401599c05f87328f835f65faaee0.zip
Add resample subpackage with downsampling support
- Add transivent.resample subpackage for signal preprocessing - Implement average_downsample() for block averaging downsampling - Add downsample_to_interval() for interval-based downsampling - Add target_sampling_interval parameter to detect_from_wfm() - Preserves signal amplitude and improves SNR through averaging - Creates uniform time arrays to avoid validation warnings
Diffstat (limited to 'src')
-rw-r--r--src/transivent/__init__.py3
-rw-r--r--src/transivent/analysis.py49
-rw-r--r--src/transivent/resample/__init__.py20
-rw-r--r--src/transivent/resample/average.py123
-rw-r--r--src/transivent/resample/utils.py65
5 files changed, 260 insertions, 0 deletions
diff --git a/src/transivent/__init__.py b/src/transivent/__init__.py
index 9cec6f6..b802f30 100644
--- a/src/transivent/__init__.py
+++ b/src/transivent/__init__.py
@@ -18,6 +18,7 @@ from .event_detector import detect_events, merge_overlapping_events
from .event_plotter import EventPlotter
from .event_processor import extract_event_waveforms
from .io import get_waveform_params, rd
+from . import resample
__all__ = [
# Main entry points
@@ -34,4 +35,6 @@ __all__ = [
# I/O utilities
"get_waveform_params",
"rd",
+ # Signal processing
+ "resample",
]
diff --git a/src/transivent/analysis.py b/src/transivent/analysis.py
index caed827..6ef34d4 100644
--- a/src/transivent/analysis.py
+++ b/src/transivent/analysis.py
@@ -98,6 +98,40 @@ def plot_preview_image(image_path: str, title: str = "Preview Image") -> None:
logger.warning(f"Failed to display preview image {image_path}: {e}")
+def _average_downsample(
+ t: np.ndarray, x: np.ndarray, q: int
+) -> Tuple[np.ndarray, np.ndarray]:
+ """
+ Private helper to downsample by averaging blocks of samples.
+
+ Parameters
+ ----------
+ t : np.ndarray
+ Time array.
+ x : np.ndarray
+ Signal array.
+ q : int
+ Downsample factor (must be integer > 1).
+
+ Returns
+ -------
+ Tuple[np.ndarray, np.ndarray]
+ Downsampled time and signal arrays.
+ """
+ if q <= 1:
+ return t, x
+
+ n_keep = (len(x) // q) * q
+ x_reshaped = x[:n_keep].reshape(-1, q)
+ x_avg = x_reshaped.mean(axis=1)
+
+ # Generate uniform time array
+ dt = t[1] - t[0] if len(t) > 1 else 1.0
+ t_down = np.arange(len(x_avg)) * dt * q + t[0]
+
+ return t_down, x_avg
+
+
def configure_logging(log_level: str = "INFO") -> None:
"""
Configure loguru logging with specified level.
@@ -1398,6 +1432,7 @@ def detect_from_wfm(
save_plots: bool = True,
plot_dir: Optional[str] = None,
chunk_size: Optional[int] = None,
+ target_sampling_interval: Optional[float] = None,
) -> Dict[str, Any]:
"""
Detect transient events in a Wfm binary file with XML sidecar.
@@ -1444,6 +1479,11 @@ def detect_from_wfm(
Directory to save plots. If None, uses data_path_analysis/.
chunk_size : Optional[int], default=None
Chunk size for processing large files. If None, loads entire file.
+ target_sampling_interval : Optional[float], default=None
+ Target sampling interval in seconds for downsampling. If provided,
+ the data will be downsampled by block averaging to this interval
+ before event detection. This improves SNR and reduces processing time
+ for oversampled data.
Returns
-------
@@ -1500,6 +1540,15 @@ def detect_from_wfm(
# Load data
logger.info("Loading Wfm file...")
t, x = load_data(name, sampling_interval, data_path, sidecar, crop)
+
+ # Apply resampling if requested
+ if target_sampling_interval is not None:
+ from .resample import downsample_to_interval
+ current_interval = t[1] - t[0] if len(t) > 1 else sampling_interval
+ if target_sampling_interval != current_interval:
+ logger.info(f"Resampling from {current_interval:.3e}s to {target_sampling_interval:.3e}s")
+ t, x = average_downsample_to_interval(t, x, target_sampling_interval)
+ sampling_interval = target_sampling_interval
# Run analysis pipeline
logger.info("Running analysis pipeline...")
diff --git a/src/transivent/resample/__init__.py b/src/transivent/resample/__init__.py
new file mode 100644
index 0000000..b259be4
--- /dev/null
+++ b/src/transivent/resample/__init__.py
@@ -0,0 +1,20 @@
+"""
+Signal resampling utilities for transient event detection.
+
+This module provides functions for downsampling high-frequency time series data
+while preserving signal integrity and improving signal-to-noise ratio.
+
+Typical use cases:
+- Reduce processing time for oversampled data
+- Match sampling rates across multiple datasets
+- Improve SNR through averaging
+"""
+
+from .average import average_downsample, downsample_to_interval
+from .utils import calculate_downsample_factor
+
+__all__ = [
+ "average_downsample",
+ "downsample_to_interval",
+ "calculate_downsample_factor",
+] \ No newline at end of file
diff --git a/src/transivent/resample/average.py b/src/transivent/resample/average.py
new file mode 100644
index 0000000..cb4f3b0
--- /dev/null
+++ b/src/transivent/resample/average.py
@@ -0,0 +1,123 @@
+"""
+Block averaging downsampling for time series data.
+"""
+
+import numpy as np
+from typing import Tuple
+
+from .utils import validate_inputs
+
+
+def average_downsample(
+ t: np.ndarray, x: np.ndarray, q: int
+) -> Tuple[np.ndarray, np.ndarray]:
+ """
+ Downsample by averaging blocks of samples.
+
+ This method blocks the signal into groups of q samples and averages each group.
+ The time array is replaced with a uniform array at the new sampling rate.
+
+ Parameters
+ ----------
+ t : np.ndarray
+ Time array in seconds.
+ x : np.ndarray
+ Signal array.
+ q : int
+ Downsample factor (must be integer >= 1).
+
+ Returns
+ -------
+ Tuple[np.ndarray, np.ndarray]
+ Downsampled time and signal arrays.
+
+ Raises
+ ------
+ ValueError
+ If q < 1 or if input arrays are invalid.
+
+ Notes
+ -----
+ - Improves signal-to-noise ratio by sqrt(q) for white noise
+ - Preserves signal amplitude (mean is preserved)
+ - Creates uniform time array (no validation warnings)
+ - Last incomplete block is discarded
+
+ Examples
+ --------
+ >>> import numpy as np
+ >>> t = np.linspace(0, 1, 10000)
+ >>> x = np.random.randn(10000)
+ >>> t_down, x_down = average_downsample(t, x, q=10)
+ >>> print(len(t_down), len(x_down))
+ 1000 1000
+ """
+ validate_inputs(t, x)
+
+ if q < 1:
+ raise ValueError(f"Downsample factor q must be >= 1, got {q}")
+
+ if q == 1:
+ # No downsampling needed
+ return t.copy(), x.copy()
+
+ # Calculate how many complete blocks we have
+ n_keep = (len(x) // q) * q
+
+ if n_keep == 0:
+ raise ValueError(
+ f"Input length ({len(x)}) is less than downsample factor ({q}). "
+ "No complete blocks available for averaging."
+ )
+
+ # Reshape and average
+ x_reshaped = x[:n_keep].reshape(-1, q)
+ x_avg = x_reshaped.mean(axis=1)
+
+ # Generate uniform time array
+ dt = t[1] - t[0] if len(t) > 1 else 1.0
+ t_down = np.arange(len(x_avg)) * dt * q + t[0]
+
+ return t_down, x_avg
+
+
+def downsample_to_interval(
+ t: np.ndarray, x: np.ndarray, target_interval: float
+) -> Tuple[np.ndarray, np.ndarray]:
+ """
+ Downsample by averaging to achieve a target sampling interval.
+
+ This is a convenience wrapper that calculates the downsample factor
+ from the current and target sampling intervals.
+
+ Parameters
+ ----------
+ t : np.ndarray
+ Time array in seconds.
+ x : np.ndarray
+ Signal array.
+ target_interval : float
+ Target sampling interval in seconds.
+
+ Returns
+ -------
+ Tuple[np.ndarray, np.ndarray]
+ Downsampled time and signal arrays.
+
+ Examples
+ --------
+ >>> # Downsample 20 MHz data to 2 MHz
+ >>> t = np.arange(100000) * 5e-8 # 20 MHz
+ >>> x = np.random.randn(100000)
+ >>> t_down, x_down = downsample_to_interval(t, x, 5e-7) # 2 MHz
+ >>> print(f"Original dt: {t[1]-t[0]:.2e}, Downsampled dt: {t_down[1]-t_down[0]:.2e}")
+ Original dt: 5.00e-08, Downsampled dt: 5.00e-07
+ """
+ validate_inputs(t, x)
+
+ current_interval = t[1] - t[0] if len(t) > 1 else 1.0
+
+ from .utils import calculate_downsample_factor
+ q = calculate_downsample_factor(current_interval, target_interval)
+
+ return average_downsample(t, x, q) \ No newline at end of file
diff --git a/src/transivent/resample/utils.py b/src/transivent/resample/utils.py
new file mode 100644
index 0000000..2d49c14
--- /dev/null
+++ b/src/transivent/resample/utils.py
@@ -0,0 +1,65 @@
+"""
+Utility functions for signal resampling.
+"""
+
+import numpy as np
+from typing import Tuple
+
+
+def calculate_downsample_factor(
+ current_interval: float, target_interval: float
+) -> int:
+ """
+ Calculate the integer downsample factor needed to reach target sampling interval.
+
+ Parameters
+ ----------
+ current_interval : float
+ Current sampling interval in seconds.
+ target_interval : float
+ Target sampling interval in seconds.
+
+ Returns
+ -------
+ int
+ Integer downsample factor (must be >= 1).
+
+ Raises
+ ------
+ ValueError
+ If target_interval < current_interval (upsampling not supported).
+ """
+ if target_interval < current_interval:
+ raise ValueError(
+ f"Target interval ({target_interval:e}s) < current interval "
+ f"({current_interval:e}s). Upsampling not supported."
+ )
+
+ factor = int(round(target_interval / current_interval))
+ return max(1, factor)
+
+
+def validate_inputs(t: np.ndarray, x: np.ndarray) -> None:
+ """
+ Validate time and signal arrays.
+
+ Parameters
+ ----------
+ t : np.ndarray
+ Time array.
+ x : np.ndarray
+ Signal array.
+
+ Raises
+ ------
+ ValueError
+ If arrays have different lengths or are empty.
+ """
+ if len(t) != len(x):
+ raise ValueError(f"Time array length ({len(t)}) != signal array length ({len(x)})")
+
+ if len(t) == 0:
+ raise ValueError("Input arrays cannot be empty")
+
+ if len(t) < 2:
+ raise ValueError("Input arrays must have at least 2 points") \ No newline at end of file