Add Parquet measurement storage

This commit is contained in:
2025-08-20 23:59:48 +00:00
parent 7cfd36f02b
commit 7429f6433c

View File

@@ -1,4 +1,4 @@
"""Data repository implementation using SQLite. """Data repository implementation using SQLite and Parquet.
This module provides SQLite-based storage for test run metadata and results. This module provides SQLite-based storage for test run metadata and results.
Time-series measurements are stored separately in Parquet files. Time-series measurements are stored separately in Parquet files.
@@ -12,6 +12,8 @@ from pathlib import Path
from typing import Any from typing import Any
from uuid import UUID, uuid4 from uuid import UUID, uuid4
import pandas as pd
from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus
@@ -65,25 +67,34 @@ class ITestRepository(ABC):
"""Retrieve all test results for a run.""" """Retrieve all test results for a run."""
@abstractmethod @abstractmethod
def get_measurements_dataframe(self, run_id: UUID) -> Any: def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
"""Retrieve measurements as pandas DataFrame (implemented in Parquet extension).""" """Retrieve measurements as pandas DataFrame."""
class SQLiteRepository(ITestRepository): class SQLiteRepository(ITestRepository):
"""SQLite-based repository for test data. """SQLite-based repository for test data.
Stores test run metadata and scalar results in SQLite. Stores test run metadata and scalar results in SQLite.
Time-series measurements are delegated to Parquet files. Time-series measurements are stored in Parquet files.
""" """
def __init__(self, db_path: str | Path): def __init__(self, db_path: str | Path, measurements_dir: str | Path | None = None):
"""Initialise repository with database path. """Initialise repository with database and measurements paths.
Args: Args:
db_path: Path to SQLite database file db_path: Path to SQLite database file
measurements_dir: Directory for Parquet measurement files
(defaults to db_path parent / "measurements")
""" """
self.db_path = Path(db_path) self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True) self.db_path.parent.mkdir(parents=True, exist_ok=True)
if measurements_dir is None:
self.measurements_dir = self.db_path.parent / "measurements"
else:
self.measurements_dir = Path(measurements_dir)
self.measurements_dir.mkdir(parents=True, exist_ok=True)
self._init_database() self._init_database()
def _init_database(self) -> None: def _init_database(self) -> None:
@@ -243,11 +254,38 @@ class SQLiteRepository(ITestRepository):
run_id: UUID, run_id: UUID,
measurements: list[Measurement], measurements: list[Measurement],
) -> None: ) -> None:
"""Save time-series measurements. """Save time-series measurements to Parquet file.
This is a stub - actual implementation will be in Parquet extension (Task 13.3). Measurements are stored in Parquet format for efficient time-series storage.
File path: {measurements_dir}/run_{run_id}/measurements.parquet
""" """
# Will be implemented in Task 13.3 with Parquet support if not measurements:
return
# Create run-specific directory
run_dir = self.measurements_dir / f"run_{run_id}"
run_dir.mkdir(parents=True, exist_ok=True)
# Convert measurements to DataFrame
data = {
"timestamp": [m.timestamp for m in measurements],
"parameter": [m.parameter for m in measurements],
"value": [m.value for m in measurements],
"unit": [m.unit for m in measurements],
"temperature": [m.temperature for m in measurements],
"input_voltage": [m.input_voltage for m in measurements],
"load_current": [m.load_current for m in measurements],
}
df = pd.DataFrame(data)
# Save to Parquet (append mode if file exists)
parquet_path = run_dir / "measurements.parquet"
if parquet_path.exists():
# Read existing data and append
existing_df = pd.read_parquet(parquet_path)
df = pd.concat([existing_df, df], ignore_index=True)
df.to_parquet(parquet_path, index=False, engine="pyarrow")
def get_run(self, run_id: UUID) -> TestRun: def get_run(self, run_id: UUID) -> TestRun:
"""Retrieve test run metadata by ID.""" """Retrieve test run metadata by ID."""
@@ -304,10 +342,18 @@ class SQLiteRepository(ITestRepository):
for row in rows for row in rows
] ]
def get_measurements_dataframe(self, run_id: UUID) -> Any: def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
"""Retrieve measurements as pandas DataFrame. """Retrieve measurements as pandas DataFrame from Parquet file.
This is a stub - actual implementation will be in Parquet extension (Task 13.3). Args:
run_id: Test run ID
Returns:
DataFrame with measurement data, or None if no measurements exist
""" """
# Will be implemented in Task 13.3 with Parquet support parquet_path = self.measurements_dir / f"run_{run_id}" / "measurements.parquet"
if not parquet_path.exists():
return None return None
return pd.read_parquet(parquet_path)