From 7429f6433ca5c41175efb71a23733f59410b6db3 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Wed, 20 Aug 2025 23:59:48 +0000 Subject: [PATCH] Add Parquet measurement storage --- src/py_dvt_ate/data/repository.py | 74 +++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 14 deletions(-) diff --git a/src/py_dvt_ate/data/repository.py b/src/py_dvt_ate/data/repository.py index 91e5dac..874e6bc 100644 --- a/src/py_dvt_ate/data/repository.py +++ b/src/py_dvt_ate/data/repository.py @@ -1,4 +1,4 @@ -"""Data repository implementation using SQLite. +"""Data repository implementation using SQLite and Parquet. This module provides SQLite-based storage for test run metadata and results. Time-series measurements are stored separately in Parquet files. @@ -12,6 +12,8 @@ from pathlib import Path from typing import Any from uuid import UUID, uuid4 +import pandas as pd + from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus @@ -65,25 +67,34 @@ class ITestRepository(ABC): """Retrieve all test results for a run.""" @abstractmethod - def get_measurements_dataframe(self, run_id: UUID) -> Any: - """Retrieve measurements as pandas DataFrame (implemented in Parquet extension).""" + def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None: + """Retrieve measurements as pandas DataFrame.""" class SQLiteRepository(ITestRepository): """SQLite-based repository for test data. Stores test run metadata and scalar results in SQLite. - Time-series measurements are delegated to Parquet files. + Time-series measurements are stored in Parquet files. """ - def __init__(self, db_path: str | Path): - """Initialise repository with database path. + def __init__(self, db_path: str | Path, measurements_dir: str | Path | None = None): + """Initialise repository with database and measurements paths. Args: db_path: Path to SQLite database file + measurements_dir: Directory for Parquet measurement files + (defaults to db_path parent / "measurements") """ self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) + + if measurements_dir is None: + self.measurements_dir = self.db_path.parent / "measurements" + else: + self.measurements_dir = Path(measurements_dir) + + self.measurements_dir.mkdir(parents=True, exist_ok=True) self._init_database() def _init_database(self) -> None: @@ -243,11 +254,38 @@ class SQLiteRepository(ITestRepository): run_id: UUID, measurements: list[Measurement], ) -> None: - """Save time-series measurements. + """Save time-series measurements to Parquet file. - This is a stub - actual implementation will be in Parquet extension (Task 13.3). + Measurements are stored in Parquet format for efficient time-series storage. + File path: {measurements_dir}/run_{run_id}/measurements.parquet """ - # Will be implemented in Task 13.3 with Parquet support + if not measurements: + return + + # Create run-specific directory + run_dir = self.measurements_dir / f"run_{run_id}" + run_dir.mkdir(parents=True, exist_ok=True) + + # Convert measurements to DataFrame + data = { + "timestamp": [m.timestamp for m in measurements], + "parameter": [m.parameter for m in measurements], + "value": [m.value for m in measurements], + "unit": [m.unit for m in measurements], + "temperature": [m.temperature for m in measurements], + "input_voltage": [m.input_voltage for m in measurements], + "load_current": [m.load_current for m in measurements], + } + df = pd.DataFrame(data) + + # Save to Parquet (append mode if file exists) + parquet_path = run_dir / "measurements.parquet" + if parquet_path.exists(): + # Read existing data and append + existing_df = pd.read_parquet(parquet_path) + df = pd.concat([existing_df, df], ignore_index=True) + + df.to_parquet(parquet_path, index=False, engine="pyarrow") def get_run(self, run_id: UUID) -> TestRun: """Retrieve test run metadata by ID.""" @@ -304,10 +342,18 @@ class SQLiteRepository(ITestRepository): for row in rows ] - def get_measurements_dataframe(self, run_id: UUID) -> Any: - """Retrieve measurements as pandas DataFrame. + def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None: + """Retrieve measurements as pandas DataFrame from Parquet file. - This is a stub - actual implementation will be in Parquet extension (Task 13.3). + Args: + run_id: Test run ID + + Returns: + DataFrame with measurement data, or None if no measurements exist """ - # Will be implemented in Task 13.3 with Parquet support - return None + parquet_path = self.measurements_dir / f"run_{run_id}" / "measurements.parquet" + + if not parquet_path.exists(): + return None + + return pd.read_parquet(parquet_path)