"""Data repository implementation using SQLite and Parquet. This module provides SQLite-based storage for test run metadata and results. Time-series measurements are stored separately in Parquet files. """ import json import sqlite3 from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path from typing import Any from uuid import UUID, uuid4 import pandas as pd from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus class ITestRepository(ABC): """Repository interface for test data.""" @abstractmethod def create_run( self, test_name: str, config: dict[str, Any], operator: str | None = None, description: str | None = None, ) -> UUID: """Create a new test run and return its ID.""" @abstractmethod def update_run_status(self, run_id: UUID, status: TestStatus) -> None: """Update the status of a test run.""" @abstractmethod def complete_run(self, run_id: UUID, status: TestStatus) -> None: """Mark a test run as complete with final status.""" @abstractmethod def save_result( self, run_id: UUID, parameter: str, value: float, unit: str, lower_limit: float | None = None, upper_limit: float | None = None, ) -> None: """Save a scalar test result.""" @abstractmethod def save_measurements( self, run_id: UUID, measurements: list[Measurement], ) -> None: """Save time-series measurements (implemented in Parquet extension).""" @abstractmethod def get_run(self, run_id: UUID) -> TestRun: """Retrieve test run metadata by ID.""" @abstractmethod def get_results(self, run_id: UUID) -> list[TestResult]: """Retrieve all test results for a run.""" @abstractmethod def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None: """Retrieve measurements as pandas DataFrame.""" @abstractmethod def get_all_runs(self) -> list[TestRun]: """Retrieve all test runs, ordered by started_at descending.""" def close(self) -> None: """Close repository and release resources. Optional to implement.""" class SQLiteRepository(ITestRepository): """SQLite-based repository for test data. Stores test run metadata and scalar results in SQLite. Time-series measurements are stored in Parquet files. """ def __init__(self, db_path: str | Path, measurements_dir: str | Path | None = None): """Initialise repository with database and measurements paths. Args: db_path: Path to SQLite database file measurements_dir: Directory for Parquet measurement files (defaults to db_path parent / "measurements") """ self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) if measurements_dir is None: self.measurements_dir = self.db_path.parent / "measurements" else: self.measurements_dir = Path(measurements_dir) self.measurements_dir.mkdir(parents=True, exist_ok=True) self._init_database() def _init_database(self) -> None: """Create database tables if they don't exist.""" with sqlite3.connect(self.db_path) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS test_runs ( id TEXT PRIMARY KEY, test_name TEXT NOT NULL, description TEXT, started_at TEXT NOT NULL, completed_at TEXT, status TEXT NOT NULL DEFAULT 'pending', config_json TEXT NOT NULL, operator TEXT, notes TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS test_results ( id TEXT PRIMARY KEY, test_run_id TEXT NOT NULL, parameter TEXT NOT NULL, value REAL NOT NULL, unit TEXT, lower_limit REAL, upper_limit REAL, passed INTEGER NOT NULL, measured_at TEXT NOT NULL, FOREIGN KEY (test_run_id) REFERENCES test_runs(id) ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_test_runs_status ON test_runs(status)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_test_runs_name ON test_runs(test_name)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_test_results_run ON test_results(test_run_id)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_test_results_param ON test_results(parameter)" ) conn.commit() def create_run( self, test_name: str, config: dict[str, Any], operator: str | None = None, description: str | None = None, ) -> UUID: """Create a new test run and return its ID.""" run_id = uuid4() started_at = datetime.now() config_json = json.dumps(config) with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO test_runs ( id, test_name, description, started_at, status, config_json, operator, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( str(run_id), test_name, description, started_at.isoformat(), TestStatus.PENDING.value, config_json, operator, datetime.now().isoformat(), ), ) conn.commit() return run_id def update_run_status(self, run_id: UUID, status: TestStatus) -> None: """Update the status of a test run.""" with sqlite3.connect(self.db_path) as conn: conn.execute( "UPDATE test_runs SET status = ? WHERE id = ?", (status.value, str(run_id)), ) conn.commit() def complete_run(self, run_id: UUID, status: TestStatus) -> None: """Mark a test run as complete with final status.""" completed_at = datetime.now() with sqlite3.connect(self.db_path) as conn: conn.execute( """ UPDATE test_runs SET status = ?, completed_at = ? WHERE id = ? """, (status.value, completed_at.isoformat(), str(run_id)), ) conn.commit() def save_result( self, run_id: UUID, parameter: str, value: float, unit: str, lower_limit: float | None = None, upper_limit: float | None = None, ) -> None: """Save a scalar test result.""" result_id = uuid4() measured_at = datetime.now() # Calculate pass/fail passed = 1 # Default to pass if no limits if lower_limit is not None or upper_limit is not None: lower_ok = lower_limit is None or value >= lower_limit upper_ok = upper_limit is None or value <= upper_limit passed = 1 if (lower_ok and upper_ok) else 0 with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO test_results ( id, test_run_id, parameter, value, unit, lower_limit, upper_limit, passed, measured_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( str(result_id), str(run_id), parameter, value, unit, lower_limit, upper_limit, passed, measured_at.isoformat(), ), ) conn.commit() def save_measurements( self, run_id: UUID, measurements: list[Measurement], ) -> None: """Save time-series measurements to Parquet file. Measurements are stored in Parquet format for efficient time-series storage. File path: {measurements_dir}/run_{run_id}/measurements.parquet """ if not measurements: return # Create run-specific directory run_dir = self.measurements_dir / f"run_{run_id}" run_dir.mkdir(parents=True, exist_ok=True) # Convert measurements to DataFrame data = { "timestamp": [m.timestamp for m in measurements], "parameter": [m.parameter for m in measurements], "value": [m.value for m in measurements], "unit": [m.unit for m in measurements], "temperature": [m.temperature for m in measurements], "input_voltage": [m.input_voltage for m in measurements], "load_current": [m.load_current for m in measurements], } df = pd.DataFrame(data) # Save to Parquet (append mode if file exists) parquet_path = run_dir / "measurements.parquet" if parquet_path.exists(): # Read existing data and append existing_df = pd.read_parquet(parquet_path) df = pd.concat([existing_df, df], ignore_index=True) df.to_parquet(parquet_path, index=False, engine="pyarrow") def get_run(self, run_id: UUID) -> TestRun: """Retrieve test run metadata by ID.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( "SELECT * FROM test_runs WHERE id = ?", (str(run_id),), ) row = cursor.fetchone() if row is None: msg = f"Test run {run_id} not found" raise ValueError(msg) return TestRun( id=row["id"], test_name=row["test_name"], description=row["description"], started_at=datetime.fromisoformat(row["started_at"]), completed_at=( datetime.fromisoformat(row["completed_at"]) if row["completed_at"] else None ), status=TestStatus(row["status"]), config_json=row["config_json"], operator=row["operator"], notes=row["notes"], created_at=datetime.fromisoformat(row["created_at"]), ) def get_results(self, run_id: UUID) -> list[TestResult]: """Retrieve all test results for a run.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( "SELECT * FROM test_results WHERE test_run_id = ?", (str(run_id),), ) rows = cursor.fetchall() return [ TestResult( id=row["id"], test_run_id=row["test_run_id"], parameter=row["parameter"], value=row["value"], unit=row["unit"], lower_limit=row["lower_limit"], upper_limit=row["upper_limit"], measured_at=datetime.fromisoformat(row["measured_at"]), ) for row in rows ] def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None: """Retrieve measurements as pandas DataFrame from Parquet file. Args: run_id: Test run ID Returns: DataFrame with measurement data, or None if no measurements exist """ parquet_path = self.measurements_dir / f"run_{run_id}" / "measurements.parquet" if not parquet_path.exists(): return None return pd.read_parquet(parquet_path) def get_all_runs(self) -> list[TestRun]: """Retrieve all test runs, ordered by started_at descending. Returns: List of all TestRun objects, newest first. """ with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT id, test_name, started_at, status, config_json, description, completed_at, operator, notes, created_at FROM test_runs ORDER BY started_at DESC """) rows = cursor.fetchall() return [ TestRun( id=row["id"], test_name=row["test_name"], started_at=datetime.fromisoformat(row["started_at"]), status=TestStatus(row["status"]), config_json=row["config_json"], description=row["description"], completed_at=( datetime.fromisoformat(row["completed_at"]) if row["completed_at"] else None ), operator=row["operator"], notes=row["notes"], created_at=datetime.fromisoformat(row["created_at"]), ) for row in rows ] def close(self) -> None: """Close repository and release resources. SQLite connections are managed via context managers and auto-close. This method performs explicit cleanup for Windows file handle issues. """ # Force garbage collection to release any lingering connections import gc gc.collect()