Add Parquet measurement storage
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
"""Data repository implementation using SQLite.
|
||||
"""Data repository implementation using SQLite and Parquet.
|
||||
|
||||
This module provides SQLite-based storage for test run metadata and results.
|
||||
Time-series measurements are stored separately in Parquet files.
|
||||
@@ -12,6 +12,8 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus
|
||||
|
||||
|
||||
@@ -65,25 +67,34 @@ class ITestRepository(ABC):
|
||||
"""Retrieve all test results for a run."""
|
||||
|
||||
@abstractmethod
|
||||
def get_measurements_dataframe(self, run_id: UUID) -> Any:
|
||||
"""Retrieve measurements as pandas DataFrame (implemented in Parquet extension)."""
|
||||
def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
|
||||
"""Retrieve measurements as pandas DataFrame."""
|
||||
|
||||
|
||||
class SQLiteRepository(ITestRepository):
|
||||
"""SQLite-based repository for test data.
|
||||
|
||||
Stores test run metadata and scalar results in SQLite.
|
||||
Time-series measurements are delegated to Parquet files.
|
||||
Time-series measurements are stored in Parquet files.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: str | Path):
|
||||
"""Initialise repository with database path.
|
||||
def __init__(self, db_path: str | Path, measurements_dir: str | Path | None = None):
|
||||
"""Initialise repository with database and measurements paths.
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database file
|
||||
measurements_dir: Directory for Parquet measurement files
|
||||
(defaults to db_path parent / "measurements")
|
||||
"""
|
||||
self.db_path = Path(db_path)
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if measurements_dir is None:
|
||||
self.measurements_dir = self.db_path.parent / "measurements"
|
||||
else:
|
||||
self.measurements_dir = Path(measurements_dir)
|
||||
|
||||
self.measurements_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._init_database()
|
||||
|
||||
def _init_database(self) -> None:
|
||||
@@ -243,11 +254,38 @@ class SQLiteRepository(ITestRepository):
|
||||
run_id: UUID,
|
||||
measurements: list[Measurement],
|
||||
) -> None:
|
||||
"""Save time-series measurements.
|
||||
"""Save time-series measurements to Parquet file.
|
||||
|
||||
This is a stub - actual implementation will be in Parquet extension (Task 13.3).
|
||||
Measurements are stored in Parquet format for efficient time-series storage.
|
||||
File path: {measurements_dir}/run_{run_id}/measurements.parquet
|
||||
"""
|
||||
# Will be implemented in Task 13.3 with Parquet support
|
||||
if not measurements:
|
||||
return
|
||||
|
||||
# Create run-specific directory
|
||||
run_dir = self.measurements_dir / f"run_{run_id}"
|
||||
run_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Convert measurements to DataFrame
|
||||
data = {
|
||||
"timestamp": [m.timestamp for m in measurements],
|
||||
"parameter": [m.parameter for m in measurements],
|
||||
"value": [m.value for m in measurements],
|
||||
"unit": [m.unit for m in measurements],
|
||||
"temperature": [m.temperature for m in measurements],
|
||||
"input_voltage": [m.input_voltage for m in measurements],
|
||||
"load_current": [m.load_current for m in measurements],
|
||||
}
|
||||
df = pd.DataFrame(data)
|
||||
|
||||
# Save to Parquet (append mode if file exists)
|
||||
parquet_path = run_dir / "measurements.parquet"
|
||||
if parquet_path.exists():
|
||||
# Read existing data and append
|
||||
existing_df = pd.read_parquet(parquet_path)
|
||||
df = pd.concat([existing_df, df], ignore_index=True)
|
||||
|
||||
df.to_parquet(parquet_path, index=False, engine="pyarrow")
|
||||
|
||||
def get_run(self, run_id: UUID) -> TestRun:
|
||||
"""Retrieve test run metadata by ID."""
|
||||
@@ -304,10 +342,18 @@ class SQLiteRepository(ITestRepository):
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def get_measurements_dataframe(self, run_id: UUID) -> Any:
|
||||
"""Retrieve measurements as pandas DataFrame.
|
||||
def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
|
||||
"""Retrieve measurements as pandas DataFrame from Parquet file.
|
||||
|
||||
This is a stub - actual implementation will be in Parquet extension (Task 13.3).
|
||||
Args:
|
||||
run_id: Test run ID
|
||||
|
||||
Returns:
|
||||
DataFrame with measurement data, or None if no measurements exist
|
||||
"""
|
||||
# Will be implemented in Task 13.3 with Parquet support
|
||||
parquet_path = self.measurements_dir / f"run_{run_id}" / "measurements.parquet"
|
||||
|
||||
if not parquet_path.exists():
|
||||
return None
|
||||
|
||||
return pd.read_parquet(parquet_path)
|
||||
|
||||
Reference in New Issue
Block a user