aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--picostream/main.py21
-rw-r--r--picostream/pico.py6
2 files changed, 25 insertions, 2 deletions
diff --git a/picostream/main.py b/picostream/main.py
index fb56d87..fa72518 100644
--- a/picostream/main.py
+++ b/picostream/main.py
@@ -16,6 +16,7 @@ import h5py
import numpy as np
from loguru import logger
+from . import __version__
from .consumer import Consumer
from .pico import PicoDevice
@@ -170,8 +171,17 @@ class Streamer:
self.pico_device.run_streaming()
# --- Consumer ---
+ # Prepare metadata for the consumer
+ acquisition_start_time_utc = datetime.utcnow().isoformat() + "Z"
+ was_live_mode = self.max_buffer_seconds is not None
+
# Get metadata from configured device and pass to consumer
- metadata = self.pico_device.get_metadata()
+ metadata = self.pico_device.get_metadata(
+ acquisition_start_time_utc=acquisition_start_time_utc,
+ picostream_version=__version__,
+ acquisition_command="", # Will be set later in main()
+ was_live_mode=was_live_mode
+ )
# Calculate max samples for live-only mode
max_samples = None
@@ -209,6 +219,10 @@ class Streamer:
# --- Live Plotting (optional) ---
self.start_time: Optional[float] = None
+ def update_acquisition_command(self, command: str) -> None:
+ """Update the acquisition command in the consumer's metadata."""
+ self.consumer.metadata["acquisition_command"] = command
+
def _validate_config(
self,
resolution_bits: int,
@@ -586,6 +600,11 @@ def main(
offset_v=offset,
max_buffer_seconds=max_buff_sec,
)
+
+ # Update the acquisition command in metadata
+ acquisition_command = " ".join(sys.argv)
+ streamer.update_acquisition_command(acquisition_command)
+
streamer.run(app)
except RuntimeError as e:
if "PICO_NOT_FOUND" in str(e):
diff --git a/picostream/pico.py b/picostream/pico.py
index 4ec0f2c..eca2877 100644
--- a/picostream/pico.py
+++ b/picostream/pico.py
@@ -255,7 +255,7 @@ class PicoDevice:
self.auto_stop = auto_stop
self.auto_stop_stream = auto_stop_stream
- def get_metadata(self) -> Dict[str, Any]:
+ def get_metadata(self, acquisition_start_time_utc: str, picostream_version: str, acquisition_command: str, was_live_mode: bool) -> Dict[str, Any]:
"""Return comprehensive acquisition metadata."""
metadata = {
"resolution": self.resolution,
@@ -268,6 +268,10 @@ class PicoDevice:
"hardware_downsample_ratio": self.down_sample_ratio,
"max_adc": self.max_adc.value,
"data_format_version": "2.0",
+ "acquisition_start_time_utc": acquisition_start_time_utc,
+ "picostream_version": picostream_version,
+ "acquisition_command": acquisition_command,
+ "was_live_mode": was_live_mode,
}
if self.downsample_mode == "aggregate":