diff options
| author | Sam Scholten | 2025-10-23 15:06:25 +1000 |
|---|---|---|
| committer | Sam Scholten | 2025-10-23 15:22:54 +1000 |
| commit | 307bf648d8e3fe852d7daf2fa1567d1896e50f7e (patch) | |
| tree | d15344eab2003fd0a12544cc1ed9fbfef3e871d9 /examples | |
| parent | 4a7026759e099e5c81cc9c77f19182a23d2f0275 (diff) | |
| download | transivent-307bf648d8e3fe852d7daf2fa1567d1896e50f7e.tar.gz transivent-307bf648d8e3fe852d7daf2fa1567d1896e50f7e.zip | |
Release v2.0.0v2.0.0
Major API refactoring with simplified public interface.
- Added EventProcessor for high-level event processing workflow
- New utility functions for data preprocessing
- Additional example scripts for different use cases
- Comprehensive test suite
- Updated documentation with migration guide
Diffstat (limited to 'examples')
| -rw-r--r-- | examples/example.py | 136 | ||||
| -rw-r--r-- | examples/example_custom_data.py | 136 | ||||
| -rw-r--r-- | examples/example_diffusion.py | 228 | ||||
| -rw-r--r-- | examples/example_quick_start.py | 131 |
4 files changed, 631 insertions, 0 deletions
diff --git a/examples/example.py b/examples/example.py new file mode 100644 index 0000000..06c33c5 --- /dev/null +++ b/examples/example.py @@ -0,0 +1,136 @@ +from warnings import warn + +import matplotlib as mpl + +from transivent import configure_logging, get_waveform_params, process_file + +# --- User configuration dictionary --- +CONFIG = { + "SMOOTH_WIN_T": 10e-3, # smoothing window in seconds (set to None to use frequency) + "SMOOTH_WIN_F": None, # smoothing window in Hz (set to None to use time) + "DETECTION_SNR": 3, # point-by-point detection threshold, <MIN_EVENT_KEEP_SNR + "MIN_EVENT_KEEP_SNR": 5, # min event (max-)amplitude in multiples of global noise + "MIN_EVENT_T": 0.75e-6, # minimum event duration (seconds) + "WIDEN_FRAC": 10, # fraction of event length to widen detected events + "SIGNAL_POLARITY": 1, # Signal polarity: -1 for negative events (below background), +1 for positive events (above background) + "LOG_LEVEL": "INFO", # logging level: DEBUG, INFO, WARNING, ERROR, CRITICAL + "MAX_PLOT_POINTS": 10000, # Downsample threshold for plotting + "ENVELOPE_MODE_LIMIT": 10e-3, # Use envelope when time span >10ms, show thresholds when <10ms + "YSCALE_MODE": "snr", # y-scale mode for event plotter: 'snr', 'percent' or 'raw' + "FILTER_TYPE": "median", # Filter type: "savgol", "gaussian", "moving_average", "median" + "FILTER_ORDER": 3, # Order of the savgol filter for smoothing + "CHUNK_SIZE": None, # Set to None to disable chunking + # --- + "DATA_PATH": "../hycav/data/2025-07-17_bsa/", + "MEASUREMENTS": [ + { + "data": "RefCurve_2025-07-17_0_065114.Wfm.bin", + }, + { + "data": "RefCurve_2025-07-17_1_065214.Wfm.bin", + }, + { + "data": "RefCurve_2025-07-17_2_065510.Wfm.bin", + }, + { + "data": "RefCurve_2025-07-17_3_065814.Wfm.bin", + }, + { + "data": "RefCurve_2025-07-17_4_065850.Wfm.bin", + }, + { + "data": "RefCurve_2025-07-17_5_070003.Wfm.bin", + }, + { + "data": "RefCurve_2025-07-17_6_070045.Wfm.bin", + }, + { + "data": "RefCurve_2025-07-17_7_070339.Wfm.bin", + }, + ], +} + + +def main() -> None: + """ + Main function to process all measurements. + """ + # Configure logging + configure_logging(CONFIG.get("LOG_LEVEL", "INFO")) + + for measurement in CONFIG["MEASUREMENTS"]: + # Merge global config with measurement-specific overrides + merged_config = CONFIG.copy() + merged_config.update(measurement) + + # Extract parameters for process_file + name = merged_config["data"] + sidecar = merged_config.get("sidecar") + + params = get_waveform_params( + name, data_path=merged_config["DATA_PATH"], sidecar=sidecar + ) + sampling_interval = params["sampling_interval"] + + # Call with explicit parameters + process_file( + name=name, + sampling_interval=sampling_interval, + data_path=merged_config["DATA_PATH"], + smooth_win_t=merged_config.get("SMOOTH_WIN_T"), + smooth_win_f=merged_config.get("SMOOTH_WIN_F"), + detection_snr=merged_config.get("DETECTION_SNR", 3.0), + min_event_keep_snr=merged_config.get("MIN_EVENT_KEEP_SNR", 6.0), + min_event_t=merged_config.get("MIN_EVENT_T", 0.75e-6), + widen_frac=merged_config.get("WIDEN_FRAC", 10.0), + signal_polarity=merged_config.get("SIGNAL_POLARITY", -1), + max_plot_points=merged_config.get("MAX_PLOT_POINTS", 10000), + envelope_mode_limit=merged_config.get("ENVELOPE_MODE_LIMIT", 10e-3), + sidecar=sidecar, + crop=merged_config.get("crop"), + yscale_mode=merged_config.get("YSCALE_MODE", "snr"), + show_plots=True, + filter_type=merged_config.get("FILTER_TYPE", "gaussian"), + filter_order=merged_config.get("FILTER_ORDER", 2), + chunk_size=merged_config.get("CHUNK_SIZE"), + ) + + +if __name__ == "__main__": + # Set Matplotlib rcParams directly here + for optn, val in { + "backend": "QtAgg", + "figure.constrained_layout.use": True, + "figure.dpi": 90, + # "figure.figsize": (8.5 / 2.55, 6 / 2.55), + "font.family": ("sans-serif",), + "font.size": 11, + "legend.fontsize": "x-small", + "legend.handlelength": 1.5, + "legend.handletextpad": 0.6, + "lines.markersize": 4.0, + "lines.markeredgewidth": 1.6, + "lines.linewidth": 1.8, + "xtick.labelsize": 10, + "xtick.major.size": 3, + "xtick.direction": "in", + "ytick.labelsize": 10, + "ytick.direction": "in", + "ytick.major.size": 3, + "axes.formatter.useoffset": False, + "axes.formatter.use_mathtext": True, + "errorbar.capsize": 3.0, + "axes.linewidth": 1.4, + "xtick.major.width": 1.4, + "xtick.minor.width": 1.1, + "ytick.major.width": 1.4, + "ytick.minor.width": 1.1, + "axes.labelsize": 11, + }.items(): + if isinstance(val, (list, tuple)): + val = tuple(val) + try: + mpl.rcParams[optn] = val + except KeyError: + warn(f"mpl rcparams key '{optn}' not recognised as a valid rc parameter.") + main() diff --git a/examples/example_custom_data.py b/examples/example_custom_data.py new file mode 100644 index 0000000..5625d80 --- /dev/null +++ b/examples/example_custom_data.py @@ -0,0 +1,136 @@ +""" +Example: Using transivent with custom data formats. + +This example demonstrates how to use transivent's building blocks +directly with your own time-series data, without requiring any +proprietary file formats. +""" + +import numpy as np +from transivent import ( + calculate_initial_background, + calculate_clean_background, + detect_initial_events, + detect_final_events, + merge_overlapping_events, + estimate_noise, + analyze_thresholds, + create_oscilloscope_plot, + EventPlotter, +) + +# Generate synthetic data (replace with your actual data loading) +print("Generating synthetic data...") +np.random.seed(42) + +# Time array (must be in seconds) +duration = 0.05 # 50 ms +sampling_rate = 2_000_000 # 2 MHz +n_points = int(duration * sampling_rate) +t = np.linspace(0, duration, n_points) + +# Signal with some spikes +x = np.random.randn(n_points) * 0.1 # Background noise + +# Add some synthetic spikes +spike_times = [0.01, 0.023, 0.037, 0.045] +spike_amplitudes = [-1.5, -2.0, -0.8, -1.2] # Negative spikes +spike_width = 50 # samples + +for spike_t, amp in zip(spike_times, spike_amplitudes): + spike_idx = int(spike_t * sampling_rate) + spike_start = max(0, spike_idx - spike_width // 2) + spike_end = min(n_points, spike_idx + spike_width // 2) + x[spike_start:spike_end] += amp * np.exp(-0.5 * ((np.arange(spike_start, spike_end) - spike_idx) / (spike_width / 4))**2) + +print(f"Data shape: {t.shape}, Sampling interval: {t[1] - t[0]:.2e} s") + +# Analysis parameters +sampling_interval = t[1] - t[0] +smooth_n = 101 # Smoothing window in samples +detection_snr = 3.0 +min_event_keep_snr = 5.0 +signal_polarity = -1 # Negative spikes +min_event_n = 10 # Minimum event length in samples +widen_frac = 0.5 + +# Step 1: Calculate initial background +print("\nStep 1: Calculating initial background...") +bg_initial = calculate_initial_background(t, x, smooth_n, filter_type="gaussian") + +# Step 2: Estimate noise +print("Step 2: Estimating noise level...") +global_noise = estimate_noise(x, bg_initial) +print(f"Estimated noise: {global_noise:.3f}") + +# Step 3: Initial event detection +print("\nStep 3: Initial event detection...") +events_initial = detect_initial_events( + t, x, bg_initial, global_noise, detection_snr, + min_event_keep_snr, widen_frac=widen_frac, signal_polarity=signal_polarity, + min_event_n=min_event_n +) +print(f"Found {len(events_initial)} initial events") + +# Step 4: Calculate clean background (masking events) +print("\nStep 4: Calculating clean background...") +bg_clean = calculate_clean_background( + t, x, events_initial, smooth_n, bg_initial, filter_type="gaussian" +) + +# Step 5: Final event detection with clean background +print("\nStep 5: Final event detection...") +events = detect_final_events( + t, x, bg_clean, global_noise, detection_snr, + min_event_keep_snr, widen_frac=widen_frac, signal_polarity=signal_polarity, + min_event_n=min_event_n +) + +# Step 6: Merge any overlapping events +events = merge_overlapping_events(events) +print(f"Final event count: {len(events)}") + +# Print event details +print("\nDetected events:") +for i, (start, end) in enumerate(events): + duration_us = (end - start) * 1e6 + print(f" Event {i+1}: {start:.6f}s to {end:.6f}s (duration: {duration_us:.2f} µs)") + +# Step 7: Visualization +print("\nStep 7: Creating visualizations...") + +# Analyze thresholds for plotting +detection_threshold, keep_threshold = analyze_thresholds( + x, bg_clean, global_noise, detection_snr, min_event_keep_snr, signal_polarity +) + +# Create main plot +plot = create_oscilloscope_plot( + t, x, bg_initial, bg_clean, events, + detection_threshold, keep_threshold, + name="Custom Data Example", detection_snr=detection_snr, + min_event_keep_snr=min_event_keep_snr, + max_plot_points=10000, envelope_mode_limit=10e-3, smooth_n=smooth_n, + global_noise=global_noise +) + +# Create event plots +if len(events) > 0: + event_plotter = EventPlotter( + plot, events, bg_clean=bg_clean, global_noise=global_noise + ) + event_plotter.plot_events_grid(max_events=16) + event_plotter.save("custom_data_events.png") + +# Save plots +plot.save("custom_data_trace.png") +print("\nPlots saved:") +print(" - custom_data_trace.png: Full trace with events") +if len(events) > 0: + print(" - custom_data_events.png: Individual event plots") + +# Show plots (uncomment to display) +# import matplotlib.pyplot as plt +# plt.show() + +print("\nDone! The transivent building blocks work with any time-series data.")
\ No newline at end of file diff --git a/examples/example_diffusion.py b/examples/example_diffusion.py new file mode 100644 index 0000000..dc2d05e --- /dev/null +++ b/examples/example_diffusion.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +""" +Example demonstrating diffusion processing of transient events. + +This script shows how to use the new event_processor module to analyze +diffusion characteristics of detected events, similar to the approach +in heehun_diffusion_processing.py. +""" + +import numpy as np +import matplotlib.pyplot as plt + +# Import transivent functions +from transivent import ( + process_file, + get_final_events, + process_events_for_diffusion, + extract_event_waveforms, + calculate_msd_parallel, + calculate_acf, + fit_diffusion_linear, + plot_diffusion_comparison, + detect_events, + calculate_initial_background, + estimate_noise, + analyze_thresholds, +) + +# Configure logging +from transivent import configure_logging +configure_logging("INFO") + + +def create_synthetic_dataset( + n_events: int = 50, + event_duration: float = 100e-6, # 100 microseconds + sampling_interval: float = 1e-6, # 1 microsecond + total_duration: float = 0.1, # 100 milliseconds + diffusion_coeff: float = 1e-12, # m²/s + noise_level: float = 0.01, # Lower noise for better detection + signal_amplitude: float = 2.0, # Higher amplitude for better detection +): + """ + Create synthetic data with diffusion-like events. + + Parameters + ---------- + n_events : int + Number of events to generate + event_duration : float + Duration of each event in seconds + sampling_interval : float + Sampling interval in seconds + total_duration : float + Total duration of the signal in seconds + diffusion_coeff : float + Diffusion coefficient for event dynamics + noise_level : float + Noise level in the signal + signal_amplitude : float + Amplitude of the events + + Returns + ------- + t : np.ndarray + Time array + x : np.ndarray + Signal array + """ + n_points = int(total_duration / sampling_interval) + t = np.linspace(0, total_duration, n_points) + x = np.random.normal(0, noise_level, n_points) + + # Generate random event times + event_starts = np.random.uniform(0, total_duration - event_duration, n_events) + + for start_time in event_starts: + start_idx = int(start_time / sampling_interval) + end_idx = int((start_time + event_duration) / sampling_interval) + + if end_idx >= n_points: + end_idx = n_points - 1 + + # Create a diffusion-like signal for this event + event_length = end_idx - start_idx + event_t = np.linspace(0, event_duration, event_length) + + # Simulate Brownian motion + np.random.seed(int(start_time * 1e6)) # Seed for reproducibility + steps = np.random.normal(0, np.sqrt(2 * diffusion_coeff * sampling_interval), event_length) + event_signal = signal_amplitude * (1 + np.cumsum(steps)) + + # Add to main signal + x[start_idx:end_idx] += event_signal + + return t, x + + +def analyze_single_dataset(name: str, t: np.ndarray, x: np.ndarray, sampling_interval: float): + """ + Analyze a single dataset for diffusion characteristics. + + Parameters + ---------- + name : str + Name of the dataset + t : np.ndarray + Time array + x : np.ndarray + Signal array + sampling_interval : float + Sampling interval in seconds + + Returns + ------- + dict + Results containing events and diffusion analysis + """ + print(f"\n=== Analyzing {name} ===") + + # Step 1: Detect events using transivent + # First calculate background + smooth_n = int(10e-6 / sampling_interval) # 10 microsecond smoothing window + if smooth_n % 2 == 0: + smooth_n += 1 + + bg_initial = calculate_initial_background(t, x, smooth_n) + global_noise = estimate_noise(x, bg_initial) + + # Detect events + events, _ = detect_events( + t, x, bg_initial, + snr_threshold=3.0, + min_event_len=int(10e-6 / sampling_interval), + min_event_amp=5.0 * global_noise, + widen_frac=0.5, + global_noise=global_noise, + signal_polarity=1, # Positive events + ) + + print(f"Detected {len(events)} events") + + # Step 2: Process events for diffusion analysis + results = process_events_for_diffusion( + name=name, + sampling_interval=sampling_interval, + data_path="", + t=t, + x=x, + events=events, + max_lag=500, + n_jobs=-1, + ) + + print(f"Processed {results['event_count']} events for diffusion") + if results['event_count'] > 0: + print(f"Mean diffusion coefficient: {results['statistics']['mean_diffusion']:.3e} ± " + f"{results['statistics']['std_diffusion']:.3e} m²/s") + + return results + + +def main(): + """Main function demonstrating diffusion analysis.""" + print("Transivent Diffusion Processing Example") + print("=" * 40) + + # Create synthetic datasets with different diffusion characteristics + datasets = { + "Ferritin (slow diffusion)": { + "diffusion_coeff": 5e-13, # Slower diffusion + "signal_amplitude": 1.0, + "n_events": 40, + }, + "Catalase (medium diffusion)": { + "diffusion_coeff": 1e-12, # Medium diffusion + "signal_amplitude": 0.8, + "n_events": 35, + }, + "BSA (fast diffusion)": { + "diffusion_coeff": 2e-12, # Faster diffusion + "signal_amplitude": 0.6, + "n_events": 45, + }, + } + + sampling_interval = 1e-6 # 1 microsecond + + all_results = {} + + # Analyze each dataset + for name, params in datasets.items(): + # Generate synthetic data + t, x = create_synthetic_dataset( + n_events=params["n_events"], + diffusion_coeff=params["diffusion_coeff"], + signal_amplitude=params["signal_amplitude"], + sampling_interval=sampling_interval, + ) + + # Analyze for diffusion + results = analyze_single_dataset(name, t, x, sampling_interval) + all_results[name] = results + + # Create comparison plot + if any(len(results["diffusion_coeffs"]) > 0 for results in all_results.values()): + print("\n=== Creating comparison plot ===") + fig = plot_diffusion_comparison( + all_results, + show_ellipses=True, + colors=["#1f77b4", "gold", "green"], + markers=["o", "^", "s"], + ) + + # Save the plot + import os + os.makedirs("analysis_output", exist_ok=True) + fig.savefig("analysis_output/diffusion_comparison.png", dpi=150, bbox_inches="tight") + print("Saved plot to: analysis_output/diffusion_comparison.png") + + # Show plot + plt.show() + else: + print("\nNo events detected in any dataset. Check detection parameters.") + + +if __name__ == "__main__": + main()
\ No newline at end of file diff --git a/examples/example_quick_start.py b/examples/example_quick_start.py new file mode 100644 index 0000000..d5c45e4 --- /dev/null +++ b/examples/example_quick_start.py @@ -0,0 +1,131 @@ +""" +Quick start example demonstrating the simplified transivent API. + +This shows the two main entry points: +1. detect() - for custom time-series data +2. detect_from_wfm() - for Wfm files with XML sidecars +""" + +import numpy as np +from transivent import detect, detect_from_wfm, EventPlotter + +print("=" * 60) +print("TRANSIVENT QUICK START") +print("=" * 60) + +# ============================================================================ +# EXAMPLE 1: Detect events in custom time-series data +# ============================================================================ +print("\n[Example 1] Custom time-series data") +print("-" * 60) + +# Generate synthetic data +np.random.seed(42) +duration = 0.05 # 50 ms +sampling_rate = 2_000_000 # 2 MHz +n_points = int(duration * sampling_rate) +t = np.linspace(0, duration, n_points) + +# Background noise +x = np.random.randn(n_points) * 0.1 + +# Add synthetic negative spikes +spike_times = [0.01, 0.023, 0.037, 0.045] +spike_amplitudes = [-1.5, -2.0, -0.8, -1.2] +spike_width = 50 + +for spike_t, amp in zip(spike_times, spike_amplitudes): + spike_idx = int(spike_t * sampling_rate) + spike_start = max(0, spike_idx - spike_width // 2) + spike_end = min(n_points, spike_idx + spike_width // 2) + x[spike_start:spike_end] += amp * np.exp( + -0.5 * ((np.arange(spike_start, spike_end) - spike_idx) / (spike_width / 4))**2 + ) + +print(f"Generated synthetic data: {n_points} points at {sampling_rate/1e6:.1f} MHz") + +# Detect events +results = detect( + t, x, + name="Synthetic Data", + detection_snr=3.0, + min_event_keep_snr=6.0, + signal_polarity=-1, # Negative spikes + save_plots=False, +) + +events = results["events"] +print(f"✓ Found {len(events)} events") +print(f"✓ Noise level: {results['global_noise']:.3e}") + +for i, (start, end) in enumerate(events): + duration_us = (end - start) * 1e6 + print(f" Event {i+1}: {start:.6f}s to {end:.6f}s ({duration_us:.2f} µs)") + +# Access other results +print(f"✓ Results contain: {list(results.keys())}") + +# ============================================================================ +# EXAMPLE 2: Advanced - Build custom pipeline with building blocks +# ============================================================================ +print("\n[Example 2] Custom pipeline with building blocks") +print("-" * 60) + +from transivent.analysis import ( + calculate_initial_background, + estimate_noise, + detect_initial_events, +) + +# Manual pipeline for advanced control +smooth_n = 101 +bg_initial = calculate_initial_background(t, x, smooth_n, filter_type="gaussian") +global_noise = estimate_noise(x, bg_initial) +print(f"✓ Calculated background (smooth_n={smooth_n})") +print(f"✓ Estimated noise: {global_noise:.3e}") + +# ============================================================================ +# EXAMPLE 3: Visualize events +# ============================================================================ +print("\n[Example 3] Visualizing events") +print("-" * 60) + +if results["plot"] is not None and len(events) > 0: + event_plotter = EventPlotter( + results["plot"], + events, + bg_clean=results["bg_clean"], + global_noise=results["global_noise"], + ) + print(f"✓ Created event plotter for {len(events)} events") + print(f"✓ Can call event_plotter.plot_events_grid(max_events=16)") + print(f"✓ Can call event_plotter.save('path.png')") + +# ============================================================================ +# EXAMPLE 4: Using Wfm files (if you have them) +# ============================================================================ +print("\n[Example 4] Wfm file format (for reference)") +print("-" * 60) +print(""" +To analyze Wfm files with XML sidecars: + + results = detect_from_wfm( + name="data.Wfm.bin", + sampling_interval=5e-7, + data_path="/path/to/data/", + detection_snr=3.0, + ) + + events = results["events"] + print(f"Found {len(events)} events") +""") + +print("\n" + "=" * 60) +print("QUICK START COMPLETE") +print("=" * 60) +print("\nKey takeaways:") +print(" 1. Use detect() for any time-series data") +print(" 2. Use detect_from_wfm() for proprietary Wfm files") +print(" 3. All results are returned as a dict") +print(" 4. Access building blocks via transivent.analysis for custom pipelines") +print("\nFor more info, see the README or run the other examples.") |
