Add Parquet measurement storage

This commit is contained in:
2025-12-02 23:22:13 +00:00
parent 272311e53a
commit 22a79084c8

View File

@@ -1,4 +1,4 @@
"""Data repository implementation using SQLite.
"""Data repository implementation using SQLite and Parquet.
This module provides SQLite-based storage for test run metadata and results.
Time-series measurements are stored separately in Parquet files.
@@ -12,6 +12,8 @@ from pathlib import Path
from typing import Any
from uuid import UUID, uuid4
import pandas as pd
from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus
@@ -65,25 +67,34 @@ class ITestRepository(ABC):
"""Retrieve all test results for a run."""
@abstractmethod
def get_measurements_dataframe(self, run_id: UUID) -> Any:
"""Retrieve measurements as pandas DataFrame (implemented in Parquet extension)."""
def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
"""Retrieve measurements as pandas DataFrame."""
class SQLiteRepository(ITestRepository):
"""SQLite-based repository for test data.
Stores test run metadata and scalar results in SQLite.
Time-series measurements are delegated to Parquet files.
Time-series measurements are stored in Parquet files.
"""
def __init__(self, db_path: str | Path):
"""Initialise repository with database path.
def __init__(self, db_path: str | Path, measurements_dir: str | Path | None = None):
"""Initialise repository with database and measurements paths.
Args:
db_path: Path to SQLite database file
measurements_dir: Directory for Parquet measurement files
(defaults to db_path parent / "measurements")
"""
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
if measurements_dir is None:
self.measurements_dir = self.db_path.parent / "measurements"
else:
self.measurements_dir = Path(measurements_dir)
self.measurements_dir.mkdir(parents=True, exist_ok=True)
self._init_database()
def _init_database(self) -> None:
@@ -243,11 +254,38 @@ class SQLiteRepository(ITestRepository):
run_id: UUID,
measurements: list[Measurement],
) -> None:
"""Save time-series measurements.
"""Save time-series measurements to Parquet file.
This is a stub - actual implementation will be in Parquet extension (Task 13.3).
Measurements are stored in Parquet format for efficient time-series storage.
File path: {measurements_dir}/run_{run_id}/measurements.parquet
"""
# Will be implemented in Task 13.3 with Parquet support
if not measurements:
return
# Create run-specific directory
run_dir = self.measurements_dir / f"run_{run_id}"
run_dir.mkdir(parents=True, exist_ok=True)
# Convert measurements to DataFrame
data = {
"timestamp": [m.timestamp for m in measurements],
"parameter": [m.parameter for m in measurements],
"value": [m.value for m in measurements],
"unit": [m.unit for m in measurements],
"temperature": [m.temperature for m in measurements],
"input_voltage": [m.input_voltage for m in measurements],
"load_current": [m.load_current for m in measurements],
}
df = pd.DataFrame(data)
# Save to Parquet (append mode if file exists)
parquet_path = run_dir / "measurements.parquet"
if parquet_path.exists():
# Read existing data and append
existing_df = pd.read_parquet(parquet_path)
df = pd.concat([existing_df, df], ignore_index=True)
df.to_parquet(parquet_path, index=False, engine="pyarrow")
def get_run(self, run_id: UUID) -> TestRun:
"""Retrieve test run metadata by ID."""
@@ -304,10 +342,18 @@ class SQLiteRepository(ITestRepository):
for row in rows
]
def get_measurements_dataframe(self, run_id: UUID) -> Any:
"""Retrieve measurements as pandas DataFrame.
def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
"""Retrieve measurements as pandas DataFrame from Parquet file.
This is a stub - actual implementation will be in Parquet extension (Task 13.3).
Args:
run_id: Test run ID
Returns:
DataFrame with measurement data, or None if no measurements exist
"""
# Will be implemented in Task 13.3 with Parquet support
return None
parquet_path = self.measurements_dir / f"run_{run_id}" / "measurements.parquet"
if not parquet_path.exists():
return None
return pd.read_parquet(parquet_path)