summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorSam Scholten2025-10-23 15:06:25 +1000
committerSam Scholten2025-10-23 15:22:54 +1000
commit307bf648d8e3fe852d7daf2fa1567d1896e50f7e (patch)
treed15344eab2003fd0a12544cc1ed9fbfef3e871d9 /examples
parent4a7026759e099e5c81cc9c77f19182a23d2f0275 (diff)
downloadtransivent-307bf648d8e3fe852d7daf2fa1567d1896e50f7e.tar.gz
transivent-307bf648d8e3fe852d7daf2fa1567d1896e50f7e.zip
Release v2.0.0v2.0.0
Major API refactoring with simplified public interface. - Added EventProcessor for high-level event processing workflow - New utility functions for data preprocessing - Additional example scripts for different use cases - Comprehensive test suite - Updated documentation with migration guide
Diffstat (limited to 'examples')
-rw-r--r--examples/example.py136
-rw-r--r--examples/example_custom_data.py136
-rw-r--r--examples/example_diffusion.py228
-rw-r--r--examples/example_quick_start.py131
4 files changed, 631 insertions, 0 deletions
diff --git a/examples/example.py b/examples/example.py
new file mode 100644
index 0000000..06c33c5
--- /dev/null
+++ b/examples/example.py
@@ -0,0 +1,136 @@
+from warnings import warn
+
+import matplotlib as mpl
+
+from transivent import configure_logging, get_waveform_params, process_file
+
+# --- User configuration dictionary ---
+CONFIG = {
+ "SMOOTH_WIN_T": 10e-3, # smoothing window in seconds (set to None to use frequency)
+ "SMOOTH_WIN_F": None, # smoothing window in Hz (set to None to use time)
+ "DETECTION_SNR": 3, # point-by-point detection threshold, <MIN_EVENT_KEEP_SNR
+ "MIN_EVENT_KEEP_SNR": 5, # min event (max-)amplitude in multiples of global noise
+ "MIN_EVENT_T": 0.75e-6, # minimum event duration (seconds)
+ "WIDEN_FRAC": 10, # fraction of event length to widen detected events
+ "SIGNAL_POLARITY": 1, # Signal polarity: -1 for negative events (below background), +1 for positive events (above background)
+ "LOG_LEVEL": "INFO", # logging level: DEBUG, INFO, WARNING, ERROR, CRITICAL
+ "MAX_PLOT_POINTS": 10000, # Downsample threshold for plotting
+ "ENVELOPE_MODE_LIMIT": 10e-3, # Use envelope when time span >10ms, show thresholds when <10ms
+ "YSCALE_MODE": "snr", # y-scale mode for event plotter: 'snr', 'percent' or 'raw'
+ "FILTER_TYPE": "median", # Filter type: "savgol", "gaussian", "moving_average", "median"
+ "FILTER_ORDER": 3, # Order of the savgol filter for smoothing
+ "CHUNK_SIZE": None, # Set to None to disable chunking
+ # ---
+ "DATA_PATH": "../hycav/data/2025-07-17_bsa/",
+ "MEASUREMENTS": [
+ {
+ "data": "RefCurve_2025-07-17_0_065114.Wfm.bin",
+ },
+ {
+ "data": "RefCurve_2025-07-17_1_065214.Wfm.bin",
+ },
+ {
+ "data": "RefCurve_2025-07-17_2_065510.Wfm.bin",
+ },
+ {
+ "data": "RefCurve_2025-07-17_3_065814.Wfm.bin",
+ },
+ {
+ "data": "RefCurve_2025-07-17_4_065850.Wfm.bin",
+ },
+ {
+ "data": "RefCurve_2025-07-17_5_070003.Wfm.bin",
+ },
+ {
+ "data": "RefCurve_2025-07-17_6_070045.Wfm.bin",
+ },
+ {
+ "data": "RefCurve_2025-07-17_7_070339.Wfm.bin",
+ },
+ ],
+}
+
+
+def main() -> None:
+ """
+ Main function to process all measurements.
+ """
+ # Configure logging
+ configure_logging(CONFIG.get("LOG_LEVEL", "INFO"))
+
+ for measurement in CONFIG["MEASUREMENTS"]:
+ # Merge global config with measurement-specific overrides
+ merged_config = CONFIG.copy()
+ merged_config.update(measurement)
+
+ # Extract parameters for process_file
+ name = merged_config["data"]
+ sidecar = merged_config.get("sidecar")
+
+ params = get_waveform_params(
+ name, data_path=merged_config["DATA_PATH"], sidecar=sidecar
+ )
+ sampling_interval = params["sampling_interval"]
+
+ # Call with explicit parameters
+ process_file(
+ name=name,
+ sampling_interval=sampling_interval,
+ data_path=merged_config["DATA_PATH"],
+ smooth_win_t=merged_config.get("SMOOTH_WIN_T"),
+ smooth_win_f=merged_config.get("SMOOTH_WIN_F"),
+ detection_snr=merged_config.get("DETECTION_SNR", 3.0),
+ min_event_keep_snr=merged_config.get("MIN_EVENT_KEEP_SNR", 6.0),
+ min_event_t=merged_config.get("MIN_EVENT_T", 0.75e-6),
+ widen_frac=merged_config.get("WIDEN_FRAC", 10.0),
+ signal_polarity=merged_config.get("SIGNAL_POLARITY", -1),
+ max_plot_points=merged_config.get("MAX_PLOT_POINTS", 10000),
+ envelope_mode_limit=merged_config.get("ENVELOPE_MODE_LIMIT", 10e-3),
+ sidecar=sidecar,
+ crop=merged_config.get("crop"),
+ yscale_mode=merged_config.get("YSCALE_MODE", "snr"),
+ show_plots=True,
+ filter_type=merged_config.get("FILTER_TYPE", "gaussian"),
+ filter_order=merged_config.get("FILTER_ORDER", 2),
+ chunk_size=merged_config.get("CHUNK_SIZE"),
+ )
+
+
+if __name__ == "__main__":
+ # Set Matplotlib rcParams directly here
+ for optn, val in {
+ "backend": "QtAgg",
+ "figure.constrained_layout.use": True,
+ "figure.dpi": 90,
+ # "figure.figsize": (8.5 / 2.55, 6 / 2.55),
+ "font.family": ("sans-serif",),
+ "font.size": 11,
+ "legend.fontsize": "x-small",
+ "legend.handlelength": 1.5,
+ "legend.handletextpad": 0.6,
+ "lines.markersize": 4.0,
+ "lines.markeredgewidth": 1.6,
+ "lines.linewidth": 1.8,
+ "xtick.labelsize": 10,
+ "xtick.major.size": 3,
+ "xtick.direction": "in",
+ "ytick.labelsize": 10,
+ "ytick.direction": "in",
+ "ytick.major.size": 3,
+ "axes.formatter.useoffset": False,
+ "axes.formatter.use_mathtext": True,
+ "errorbar.capsize": 3.0,
+ "axes.linewidth": 1.4,
+ "xtick.major.width": 1.4,
+ "xtick.minor.width": 1.1,
+ "ytick.major.width": 1.4,
+ "ytick.minor.width": 1.1,
+ "axes.labelsize": 11,
+ }.items():
+ if isinstance(val, (list, tuple)):
+ val = tuple(val)
+ try:
+ mpl.rcParams[optn] = val
+ except KeyError:
+ warn(f"mpl rcparams key '{optn}' not recognised as a valid rc parameter.")
+ main()
diff --git a/examples/example_custom_data.py b/examples/example_custom_data.py
new file mode 100644
index 0000000..5625d80
--- /dev/null
+++ b/examples/example_custom_data.py
@@ -0,0 +1,136 @@
+"""
+Example: Using transivent with custom data formats.
+
+This example demonstrates how to use transivent's building blocks
+directly with your own time-series data, without requiring any
+proprietary file formats.
+"""
+
+import numpy as np
+from transivent import (
+ calculate_initial_background,
+ calculate_clean_background,
+ detect_initial_events,
+ detect_final_events,
+ merge_overlapping_events,
+ estimate_noise,
+ analyze_thresholds,
+ create_oscilloscope_plot,
+ EventPlotter,
+)
+
+# Generate synthetic data (replace with your actual data loading)
+print("Generating synthetic data...")
+np.random.seed(42)
+
+# Time array (must be in seconds)
+duration = 0.05 # 50 ms
+sampling_rate = 2_000_000 # 2 MHz
+n_points = int(duration * sampling_rate)
+t = np.linspace(0, duration, n_points)
+
+# Signal with some spikes
+x = np.random.randn(n_points) * 0.1 # Background noise
+
+# Add some synthetic spikes
+spike_times = [0.01, 0.023, 0.037, 0.045]
+spike_amplitudes = [-1.5, -2.0, -0.8, -1.2] # Negative spikes
+spike_width = 50 # samples
+
+for spike_t, amp in zip(spike_times, spike_amplitudes):
+ spike_idx = int(spike_t * sampling_rate)
+ spike_start = max(0, spike_idx - spike_width // 2)
+ spike_end = min(n_points, spike_idx + spike_width // 2)
+ x[spike_start:spike_end] += amp * np.exp(-0.5 * ((np.arange(spike_start, spike_end) - spike_idx) / (spike_width / 4))**2)
+
+print(f"Data shape: {t.shape}, Sampling interval: {t[1] - t[0]:.2e} s")
+
+# Analysis parameters
+sampling_interval = t[1] - t[0]
+smooth_n = 101 # Smoothing window in samples
+detection_snr = 3.0
+min_event_keep_snr = 5.0
+signal_polarity = -1 # Negative spikes
+min_event_n = 10 # Minimum event length in samples
+widen_frac = 0.5
+
+# Step 1: Calculate initial background
+print("\nStep 1: Calculating initial background...")
+bg_initial = calculate_initial_background(t, x, smooth_n, filter_type="gaussian")
+
+# Step 2: Estimate noise
+print("Step 2: Estimating noise level...")
+global_noise = estimate_noise(x, bg_initial)
+print(f"Estimated noise: {global_noise:.3f}")
+
+# Step 3: Initial event detection
+print("\nStep 3: Initial event detection...")
+events_initial = detect_initial_events(
+ t, x, bg_initial, global_noise, detection_snr,
+ min_event_keep_snr, widen_frac=widen_frac, signal_polarity=signal_polarity,
+ min_event_n=min_event_n
+)
+print(f"Found {len(events_initial)} initial events")
+
+# Step 4: Calculate clean background (masking events)
+print("\nStep 4: Calculating clean background...")
+bg_clean = calculate_clean_background(
+ t, x, events_initial, smooth_n, bg_initial, filter_type="gaussian"
+)
+
+# Step 5: Final event detection with clean background
+print("\nStep 5: Final event detection...")
+events = detect_final_events(
+ t, x, bg_clean, global_noise, detection_snr,
+ min_event_keep_snr, widen_frac=widen_frac, signal_polarity=signal_polarity,
+ min_event_n=min_event_n
+)
+
+# Step 6: Merge any overlapping events
+events = merge_overlapping_events(events)
+print(f"Final event count: {len(events)}")
+
+# Print event details
+print("\nDetected events:")
+for i, (start, end) in enumerate(events):
+ duration_us = (end - start) * 1e6
+ print(f" Event {i+1}: {start:.6f}s to {end:.6f}s (duration: {duration_us:.2f} µs)")
+
+# Step 7: Visualization
+print("\nStep 7: Creating visualizations...")
+
+# Analyze thresholds for plotting
+detection_threshold, keep_threshold = analyze_thresholds(
+ x, bg_clean, global_noise, detection_snr, min_event_keep_snr, signal_polarity
+)
+
+# Create main plot
+plot = create_oscilloscope_plot(
+ t, x, bg_initial, bg_clean, events,
+ detection_threshold, keep_threshold,
+ name="Custom Data Example", detection_snr=detection_snr,
+ min_event_keep_snr=min_event_keep_snr,
+ max_plot_points=10000, envelope_mode_limit=10e-3, smooth_n=smooth_n,
+ global_noise=global_noise
+)
+
+# Create event plots
+if len(events) > 0:
+ event_plotter = EventPlotter(
+ plot, events, bg_clean=bg_clean, global_noise=global_noise
+ )
+ event_plotter.plot_events_grid(max_events=16)
+ event_plotter.save("custom_data_events.png")
+
+# Save plots
+plot.save("custom_data_trace.png")
+print("\nPlots saved:")
+print(" - custom_data_trace.png: Full trace with events")
+if len(events) > 0:
+ print(" - custom_data_events.png: Individual event plots")
+
+# Show plots (uncomment to display)
+# import matplotlib.pyplot as plt
+# plt.show()
+
+print("\nDone! The transivent building blocks work with any time-series data.") \ No newline at end of file
diff --git a/examples/example_diffusion.py b/examples/example_diffusion.py
new file mode 100644
index 0000000..dc2d05e
--- /dev/null
+++ b/examples/example_diffusion.py
@@ -0,0 +1,228 @@
+#!/usr/bin/env python3
+"""
+Example demonstrating diffusion processing of transient events.
+
+This script shows how to use the new event_processor module to analyze
+diffusion characteristics of detected events, similar to the approach
+in heehun_diffusion_processing.py.
+"""
+
+import numpy as np
+import matplotlib.pyplot as plt
+
+# Import transivent functions
+from transivent import (
+ process_file,
+ get_final_events,
+ process_events_for_diffusion,
+ extract_event_waveforms,
+ calculate_msd_parallel,
+ calculate_acf,
+ fit_diffusion_linear,
+ plot_diffusion_comparison,
+ detect_events,
+ calculate_initial_background,
+ estimate_noise,
+ analyze_thresholds,
+)
+
+# Configure logging
+from transivent import configure_logging
+configure_logging("INFO")
+
+
+def create_synthetic_dataset(
+ n_events: int = 50,
+ event_duration: float = 100e-6, # 100 microseconds
+ sampling_interval: float = 1e-6, # 1 microsecond
+ total_duration: float = 0.1, # 100 milliseconds
+ diffusion_coeff: float = 1e-12, # m²/s
+ noise_level: float = 0.01, # Lower noise for better detection
+ signal_amplitude: float = 2.0, # Higher amplitude for better detection
+):
+ """
+ Create synthetic data with diffusion-like events.
+
+ Parameters
+ ----------
+ n_events : int
+ Number of events to generate
+ event_duration : float
+ Duration of each event in seconds
+ sampling_interval : float
+ Sampling interval in seconds
+ total_duration : float
+ Total duration of the signal in seconds
+ diffusion_coeff : float
+ Diffusion coefficient for event dynamics
+ noise_level : float
+ Noise level in the signal
+ signal_amplitude : float
+ Amplitude of the events
+
+ Returns
+ -------
+ t : np.ndarray
+ Time array
+ x : np.ndarray
+ Signal array
+ """
+ n_points = int(total_duration / sampling_interval)
+ t = np.linspace(0, total_duration, n_points)
+ x = np.random.normal(0, noise_level, n_points)
+
+ # Generate random event times
+ event_starts = np.random.uniform(0, total_duration - event_duration, n_events)
+
+ for start_time in event_starts:
+ start_idx = int(start_time / sampling_interval)
+ end_idx = int((start_time + event_duration) / sampling_interval)
+
+ if end_idx >= n_points:
+ end_idx = n_points - 1
+
+ # Create a diffusion-like signal for this event
+ event_length = end_idx - start_idx
+ event_t = np.linspace(0, event_duration, event_length)
+
+ # Simulate Brownian motion
+ np.random.seed(int(start_time * 1e6)) # Seed for reproducibility
+ steps = np.random.normal(0, np.sqrt(2 * diffusion_coeff * sampling_interval), event_length)
+ event_signal = signal_amplitude * (1 + np.cumsum(steps))
+
+ # Add to main signal
+ x[start_idx:end_idx] += event_signal
+
+ return t, x
+
+
+def analyze_single_dataset(name: str, t: np.ndarray, x: np.ndarray, sampling_interval: float):
+ """
+ Analyze a single dataset for diffusion characteristics.
+
+ Parameters
+ ----------
+ name : str
+ Name of the dataset
+ t : np.ndarray
+ Time array
+ x : np.ndarray
+ Signal array
+ sampling_interval : float
+ Sampling interval in seconds
+
+ Returns
+ -------
+ dict
+ Results containing events and diffusion analysis
+ """
+ print(f"\n=== Analyzing {name} ===")
+
+ # Step 1: Detect events using transivent
+ # First calculate background
+ smooth_n = int(10e-6 / sampling_interval) # 10 microsecond smoothing window
+ if smooth_n % 2 == 0:
+ smooth_n += 1
+
+ bg_initial = calculate_initial_background(t, x, smooth_n)
+ global_noise = estimate_noise(x, bg_initial)
+
+ # Detect events
+ events, _ = detect_events(
+ t, x, bg_initial,
+ snr_threshold=3.0,
+ min_event_len=int(10e-6 / sampling_interval),
+ min_event_amp=5.0 * global_noise,
+ widen_frac=0.5,
+ global_noise=global_noise,
+ signal_polarity=1, # Positive events
+ )
+
+ print(f"Detected {len(events)} events")
+
+ # Step 2: Process events for diffusion analysis
+ results = process_events_for_diffusion(
+ name=name,
+ sampling_interval=sampling_interval,
+ data_path="",
+ t=t,
+ x=x,
+ events=events,
+ max_lag=500,
+ n_jobs=-1,
+ )
+
+ print(f"Processed {results['event_count']} events for diffusion")
+ if results['event_count'] > 0:
+ print(f"Mean diffusion coefficient: {results['statistics']['mean_diffusion']:.3e} ± "
+ f"{results['statistics']['std_diffusion']:.3e} m²/s")
+
+ return results
+
+
+def main():
+ """Main function demonstrating diffusion analysis."""
+ print("Transivent Diffusion Processing Example")
+ print("=" * 40)
+
+ # Create synthetic datasets with different diffusion characteristics
+ datasets = {
+ "Ferritin (slow diffusion)": {
+ "diffusion_coeff": 5e-13, # Slower diffusion
+ "signal_amplitude": 1.0,
+ "n_events": 40,
+ },
+ "Catalase (medium diffusion)": {
+ "diffusion_coeff": 1e-12, # Medium diffusion
+ "signal_amplitude": 0.8,
+ "n_events": 35,
+ },
+ "BSA (fast diffusion)": {
+ "diffusion_coeff": 2e-12, # Faster diffusion
+ "signal_amplitude": 0.6,
+ "n_events": 45,
+ },
+ }
+
+ sampling_interval = 1e-6 # 1 microsecond
+
+ all_results = {}
+
+ # Analyze each dataset
+ for name, params in datasets.items():
+ # Generate synthetic data
+ t, x = create_synthetic_dataset(
+ n_events=params["n_events"],
+ diffusion_coeff=params["diffusion_coeff"],
+ signal_amplitude=params["signal_amplitude"],
+ sampling_interval=sampling_interval,
+ )
+
+ # Analyze for diffusion
+ results = analyze_single_dataset(name, t, x, sampling_interval)
+ all_results[name] = results
+
+ # Create comparison plot
+ if any(len(results["diffusion_coeffs"]) > 0 for results in all_results.values()):
+ print("\n=== Creating comparison plot ===")
+ fig = plot_diffusion_comparison(
+ all_results,
+ show_ellipses=True,
+ colors=["#1f77b4", "gold", "green"],
+ markers=["o", "^", "s"],
+ )
+
+ # Save the plot
+ import os
+ os.makedirs("analysis_output", exist_ok=True)
+ fig.savefig("analysis_output/diffusion_comparison.png", dpi=150, bbox_inches="tight")
+ print("Saved plot to: analysis_output/diffusion_comparison.png")
+
+ # Show plot
+ plt.show()
+ else:
+ print("\nNo events detected in any dataset. Check detection parameters.")
+
+
+if __name__ == "__main__":
+ main() \ No newline at end of file
diff --git a/examples/example_quick_start.py b/examples/example_quick_start.py
new file mode 100644
index 0000000..d5c45e4
--- /dev/null
+++ b/examples/example_quick_start.py
@@ -0,0 +1,131 @@
+"""
+Quick start example demonstrating the simplified transivent API.
+
+This shows the two main entry points:
+1. detect() - for custom time-series data
+2. detect_from_wfm() - for Wfm files with XML sidecars
+"""
+
+import numpy as np
+from transivent import detect, detect_from_wfm, EventPlotter
+
+print("=" * 60)
+print("TRANSIVENT QUICK START")
+print("=" * 60)
+
+# ============================================================================
+# EXAMPLE 1: Detect events in custom time-series data
+# ============================================================================
+print("\n[Example 1] Custom time-series data")
+print("-" * 60)
+
+# Generate synthetic data
+np.random.seed(42)
+duration = 0.05 # 50 ms
+sampling_rate = 2_000_000 # 2 MHz
+n_points = int(duration * sampling_rate)
+t = np.linspace(0, duration, n_points)
+
+# Background noise
+x = np.random.randn(n_points) * 0.1
+
+# Add synthetic negative spikes
+spike_times = [0.01, 0.023, 0.037, 0.045]
+spike_amplitudes = [-1.5, -2.0, -0.8, -1.2]
+spike_width = 50
+
+for spike_t, amp in zip(spike_times, spike_amplitudes):
+ spike_idx = int(spike_t * sampling_rate)
+ spike_start = max(0, spike_idx - spike_width // 2)
+ spike_end = min(n_points, spike_idx + spike_width // 2)
+ x[spike_start:spike_end] += amp * np.exp(
+ -0.5 * ((np.arange(spike_start, spike_end) - spike_idx) / (spike_width / 4))**2
+ )
+
+print(f"Generated synthetic data: {n_points} points at {sampling_rate/1e6:.1f} MHz")
+
+# Detect events
+results = detect(
+ t, x,
+ name="Synthetic Data",
+ detection_snr=3.0,
+ min_event_keep_snr=6.0,
+ signal_polarity=-1, # Negative spikes
+ save_plots=False,
+)
+
+events = results["events"]
+print(f"✓ Found {len(events)} events")
+print(f"✓ Noise level: {results['global_noise']:.3e}")
+
+for i, (start, end) in enumerate(events):
+ duration_us = (end - start) * 1e6
+ print(f" Event {i+1}: {start:.6f}s to {end:.6f}s ({duration_us:.2f} µs)")
+
+# Access other results
+print(f"✓ Results contain: {list(results.keys())}")
+
+# ============================================================================
+# EXAMPLE 2: Advanced - Build custom pipeline with building blocks
+# ============================================================================
+print("\n[Example 2] Custom pipeline with building blocks")
+print("-" * 60)
+
+from transivent.analysis import (
+ calculate_initial_background,
+ estimate_noise,
+ detect_initial_events,
+)
+
+# Manual pipeline for advanced control
+smooth_n = 101
+bg_initial = calculate_initial_background(t, x, smooth_n, filter_type="gaussian")
+global_noise = estimate_noise(x, bg_initial)
+print(f"✓ Calculated background (smooth_n={smooth_n})")
+print(f"✓ Estimated noise: {global_noise:.3e}")
+
+# ============================================================================
+# EXAMPLE 3: Visualize events
+# ============================================================================
+print("\n[Example 3] Visualizing events")
+print("-" * 60)
+
+if results["plot"] is not None and len(events) > 0:
+ event_plotter = EventPlotter(
+ results["plot"],
+ events,
+ bg_clean=results["bg_clean"],
+ global_noise=results["global_noise"],
+ )
+ print(f"✓ Created event plotter for {len(events)} events")
+ print(f"✓ Can call event_plotter.plot_events_grid(max_events=16)")
+ print(f"✓ Can call event_plotter.save('path.png')")
+
+# ============================================================================
+# EXAMPLE 4: Using Wfm files (if you have them)
+# ============================================================================
+print("\n[Example 4] Wfm file format (for reference)")
+print("-" * 60)
+print("""
+To analyze Wfm files with XML sidecars:
+
+ results = detect_from_wfm(
+ name="data.Wfm.bin",
+ sampling_interval=5e-7,
+ data_path="/path/to/data/",
+ detection_snr=3.0,
+ )
+
+ events = results["events"]
+ print(f"Found {len(events)} events")
+""")
+
+print("\n" + "=" * 60)
+print("QUICK START COMPLETE")
+print("=" * 60)
+print("\nKey takeaways:")
+print(" 1. Use detect() for any time-series data")
+print(" 2. Use detect_from_wfm() for proprietary Wfm files")
+print(" 3. All results are returned as a dict")
+print(" 4. Access building blocks via transivent.analysis for custom pipelines")
+print("\nFor more info, see the README or run the other examples.")