From 7cfd36f02b10c3044e4f33cec77a2b97cccd4d16 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Sun, 17 Aug 2025 20:54:35 +0000 Subject: [PATCH] Implement SQLite repository --- src/py_dvt_ate/data/repository.py | 313 ++++++++++++++++++++++++++++++ 1 file changed, 313 insertions(+) create mode 100644 src/py_dvt_ate/data/repository.py diff --git a/src/py_dvt_ate/data/repository.py b/src/py_dvt_ate/data/repository.py new file mode 100644 index 0000000..91e5dac --- /dev/null +++ b/src/py_dvt_ate/data/repository.py @@ -0,0 +1,313 @@ +"""Data repository implementation using SQLite. + +This module provides SQLite-based storage for test run metadata and results. +Time-series measurements are stored separately in Parquet files. +""" + +import json +import sqlite3 +from abc import ABC, abstractmethod +from datetime import datetime +from pathlib import Path +from typing import Any +from uuid import UUID, uuid4 + +from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus + + +class ITestRepository(ABC): + """Repository interface for test data.""" + + @abstractmethod + def create_run( + self, + test_name: str, + config: dict[str, Any], + operator: str | None = None, + description: str | None = None, + ) -> UUID: + """Create a new test run and return its ID.""" + + @abstractmethod + def update_run_status(self, run_id: UUID, status: TestStatus) -> None: + """Update the status of a test run.""" + + @abstractmethod + def complete_run(self, run_id: UUID, status: TestStatus) -> None: + """Mark a test run as complete with final status.""" + + @abstractmethod + def save_result( + self, + run_id: UUID, + parameter: str, + value: float, + unit: str, + lower_limit: float | None = None, + upper_limit: float | None = None, + ) -> None: + """Save a scalar test result.""" + + @abstractmethod + def save_measurements( + self, + run_id: UUID, + measurements: list[Measurement], + ) -> None: + """Save time-series measurements (implemented in Parquet extension).""" + + @abstractmethod + def get_run(self, run_id: UUID) -> TestRun: + """Retrieve test run metadata by ID.""" + + @abstractmethod + def get_results(self, run_id: UUID) -> list[TestResult]: + """Retrieve all test results for a run.""" + + @abstractmethod + def get_measurements_dataframe(self, run_id: UUID) -> Any: + """Retrieve measurements as pandas DataFrame (implemented in Parquet extension).""" + + +class SQLiteRepository(ITestRepository): + """SQLite-based repository for test data. + + Stores test run metadata and scalar results in SQLite. + Time-series measurements are delegated to Parquet files. + """ + + def __init__(self, db_path: str | Path): + """Initialise repository with database path. + + Args: + db_path: Path to SQLite database file + """ + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._init_database() + + def _init_database(self) -> None: + """Create database tables if they don't exist.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS test_runs ( + id TEXT PRIMARY KEY, + test_name TEXT NOT NULL, + description TEXT, + started_at TEXT NOT NULL, + completed_at TEXT, + status TEXT NOT NULL DEFAULT 'pending', + config_json TEXT NOT NULL, + operator TEXT, + notes TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS test_results ( + id TEXT PRIMARY KEY, + test_run_id TEXT NOT NULL, + parameter TEXT NOT NULL, + value REAL NOT NULL, + unit TEXT, + lower_limit REAL, + upper_limit REAL, + passed INTEGER NOT NULL, + measured_at TEXT NOT NULL, + FOREIGN KEY (test_run_id) REFERENCES test_runs(id) + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_test_runs_status ON test_runs(status)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_test_runs_name ON test_runs(test_name)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_test_results_run ON test_results(test_run_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_test_results_param ON test_results(parameter)" + ) + conn.commit() + + def create_run( + self, + test_name: str, + config: dict[str, Any], + operator: str | None = None, + description: str | None = None, + ) -> UUID: + """Create a new test run and return its ID.""" + run_id = uuid4() + started_at = datetime.now() + config_json = json.dumps(config) + + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + INSERT INTO test_runs ( + id, test_name, description, started_at, status, + config_json, operator, created_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + str(run_id), + test_name, + description, + started_at.isoformat(), + TestStatus.PENDING.value, + config_json, + operator, + datetime.now().isoformat(), + ), + ) + conn.commit() + + return run_id + + def update_run_status(self, run_id: UUID, status: TestStatus) -> None: + """Update the status of a test run.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "UPDATE test_runs SET status = ? WHERE id = ?", + (status.value, str(run_id)), + ) + conn.commit() + + def complete_run(self, run_id: UUID, status: TestStatus) -> None: + """Mark a test run as complete with final status.""" + completed_at = datetime.now() + + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + UPDATE test_runs + SET status = ?, completed_at = ? + WHERE id = ? + """, + (status.value, completed_at.isoformat(), str(run_id)), + ) + conn.commit() + + def save_result( + self, + run_id: UUID, + parameter: str, + value: float, + unit: str, + lower_limit: float | None = None, + upper_limit: float | None = None, + ) -> None: + """Save a scalar test result.""" + result_id = uuid4() + measured_at = datetime.now() + + # Calculate pass/fail + passed = 1 # Default to pass if no limits + if lower_limit is not None or upper_limit is not None: + lower_ok = lower_limit is None or value >= lower_limit + upper_ok = upper_limit is None or value <= upper_limit + passed = 1 if (lower_ok and upper_ok) else 0 + + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + INSERT INTO test_results ( + id, test_run_id, parameter, value, unit, + lower_limit, upper_limit, passed, measured_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + str(result_id), + str(run_id), + parameter, + value, + unit, + lower_limit, + upper_limit, + passed, + measured_at.isoformat(), + ), + ) + conn.commit() + + def save_measurements( + self, + run_id: UUID, + measurements: list[Measurement], + ) -> None: + """Save time-series measurements. + + This is a stub - actual implementation will be in Parquet extension (Task 13.3). + """ + # Will be implemented in Task 13.3 with Parquet support + + def get_run(self, run_id: UUID) -> TestRun: + """Retrieve test run metadata by ID.""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + "SELECT * FROM test_runs WHERE id = ?", + (str(run_id),), + ) + row = cursor.fetchone() + + if row is None: + msg = f"Test run {run_id} not found" + raise ValueError(msg) + + return TestRun( + id=row["id"], + test_name=row["test_name"], + description=row["description"], + started_at=datetime.fromisoformat(row["started_at"]), + completed_at=( + datetime.fromisoformat(row["completed_at"]) + if row["completed_at"] + else None + ), + status=TestStatus(row["status"]), + config_json=row["config_json"], + operator=row["operator"], + notes=row["notes"], + created_at=datetime.fromisoformat(row["created_at"]), + ) + + def get_results(self, run_id: UUID) -> list[TestResult]: + """Retrieve all test results for a run.""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + "SELECT * FROM test_results WHERE test_run_id = ?", + (str(run_id),), + ) + rows = cursor.fetchall() + + return [ + TestResult( + id=row["id"], + test_run_id=row["test_run_id"], + parameter=row["parameter"], + value=row["value"], + unit=row["unit"], + lower_limit=row["lower_limit"], + upper_limit=row["upper_limit"], + measured_at=datetime.fromisoformat(row["measured_at"]), + ) + for row in rows + ] + + def get_measurements_dataframe(self, run_id: UUID) -> Any: + """Retrieve measurements as pandas DataFrame. + + This is a stub - actual implementation will be in Parquet extension (Task 13.3). + """ + # Will be implemented in Task 13.3 with Parquet support + return None