Implement SQLite repository

This commit is contained in:
2025-08-17 20:54:35 +00:00
parent 0835e3785b
commit b045165f6e

View File

@@ -0,0 +1,313 @@
"""Data repository implementation using SQLite.
This module provides SQLite-based storage for test run metadata and results.
Time-series measurements are stored separately in Parquet files.
"""
import json
import sqlite3
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Any
from uuid import UUID, uuid4
from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus
class ITestRepository(ABC):
"""Repository interface for test data."""
@abstractmethod
def create_run(
self,
test_name: str,
config: dict[str, Any],
operator: str | None = None,
description: str | None = None,
) -> UUID:
"""Create a new test run and return its ID."""
@abstractmethod
def update_run_status(self, run_id: UUID, status: TestStatus) -> None:
"""Update the status of a test run."""
@abstractmethod
def complete_run(self, run_id: UUID, status: TestStatus) -> None:
"""Mark a test run as complete with final status."""
@abstractmethod
def save_result(
self,
run_id: UUID,
parameter: str,
value: float,
unit: str,
lower_limit: float | None = None,
upper_limit: float | None = None,
) -> None:
"""Save a scalar test result."""
@abstractmethod
def save_measurements(
self,
run_id: UUID,
measurements: list[Measurement],
) -> None:
"""Save time-series measurements (implemented in Parquet extension)."""
@abstractmethod
def get_run(self, run_id: UUID) -> TestRun:
"""Retrieve test run metadata by ID."""
@abstractmethod
def get_results(self, run_id: UUID) -> list[TestResult]:
"""Retrieve all test results for a run."""
@abstractmethod
def get_measurements_dataframe(self, run_id: UUID) -> Any:
"""Retrieve measurements as pandas DataFrame (implemented in Parquet extension)."""
class SQLiteRepository(ITestRepository):
"""SQLite-based repository for test data.
Stores test run metadata and scalar results in SQLite.
Time-series measurements are delegated to Parquet files.
"""
def __init__(self, db_path: str | Path):
"""Initialise repository with database path.
Args:
db_path: Path to SQLite database file
"""
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_database()
def _init_database(self) -> None:
"""Create database tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS test_runs (
id TEXT PRIMARY KEY,
test_name TEXT NOT NULL,
description TEXT,
started_at TEXT NOT NULL,
completed_at TEXT,
status TEXT NOT NULL DEFAULT 'pending',
config_json TEXT NOT NULL,
operator TEXT,
notes TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS test_results (
id TEXT PRIMARY KEY,
test_run_id TEXT NOT NULL,
parameter TEXT NOT NULL,
value REAL NOT NULL,
unit TEXT,
lower_limit REAL,
upper_limit REAL,
passed INTEGER NOT NULL,
measured_at TEXT NOT NULL,
FOREIGN KEY (test_run_id) REFERENCES test_runs(id)
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_test_runs_status ON test_runs(status)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_test_runs_name ON test_runs(test_name)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_test_results_run ON test_results(test_run_id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_test_results_param ON test_results(parameter)"
)
conn.commit()
def create_run(
self,
test_name: str,
config: dict[str, Any],
operator: str | None = None,
description: str | None = None,
) -> UUID:
"""Create a new test run and return its ID."""
run_id = uuid4()
started_at = datetime.now()
config_json = json.dumps(config)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO test_runs (
id, test_name, description, started_at, status,
config_json, operator, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(run_id),
test_name,
description,
started_at.isoformat(),
TestStatus.PENDING.value,
config_json,
operator,
datetime.now().isoformat(),
),
)
conn.commit()
return run_id
def update_run_status(self, run_id: UUID, status: TestStatus) -> None:
"""Update the status of a test run."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE test_runs SET status = ? WHERE id = ?",
(status.value, str(run_id)),
)
conn.commit()
def complete_run(self, run_id: UUID, status: TestStatus) -> None:
"""Mark a test run as complete with final status."""
completed_at = datetime.now()
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
UPDATE test_runs
SET status = ?, completed_at = ?
WHERE id = ?
""",
(status.value, completed_at.isoformat(), str(run_id)),
)
conn.commit()
def save_result(
self,
run_id: UUID,
parameter: str,
value: float,
unit: str,
lower_limit: float | None = None,
upper_limit: float | None = None,
) -> None:
"""Save a scalar test result."""
result_id = uuid4()
measured_at = datetime.now()
# Calculate pass/fail
passed = 1 # Default to pass if no limits
if lower_limit is not None or upper_limit is not None:
lower_ok = lower_limit is None or value >= lower_limit
upper_ok = upper_limit is None or value <= upper_limit
passed = 1 if (lower_ok and upper_ok) else 0
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO test_results (
id, test_run_id, parameter, value, unit,
lower_limit, upper_limit, passed, measured_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(result_id),
str(run_id),
parameter,
value,
unit,
lower_limit,
upper_limit,
passed,
measured_at.isoformat(),
),
)
conn.commit()
def save_measurements(
self,
run_id: UUID,
measurements: list[Measurement],
) -> None:
"""Save time-series measurements.
This is a stub - actual implementation will be in Parquet extension (Task 13.3).
"""
# Will be implemented in Task 13.3 with Parquet support
def get_run(self, run_id: UUID) -> TestRun:
"""Retrieve test run metadata by ID."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM test_runs WHERE id = ?",
(str(run_id),),
)
row = cursor.fetchone()
if row is None:
msg = f"Test run {run_id} not found"
raise ValueError(msg)
return TestRun(
id=row["id"],
test_name=row["test_name"],
description=row["description"],
started_at=datetime.fromisoformat(row["started_at"]),
completed_at=(
datetime.fromisoformat(row["completed_at"])
if row["completed_at"]
else None
),
status=TestStatus(row["status"]),
config_json=row["config_json"],
operator=row["operator"],
notes=row["notes"],
created_at=datetime.fromisoformat(row["created_at"]),
)
def get_results(self, run_id: UUID) -> list[TestResult]:
"""Retrieve all test results for a run."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM test_results WHERE test_run_id = ?",
(str(run_id),),
)
rows = cursor.fetchall()
return [
TestResult(
id=row["id"],
test_run_id=row["test_run_id"],
parameter=row["parameter"],
value=row["value"],
unit=row["unit"],
lower_limit=row["lower_limit"],
upper_limit=row["upper_limit"],
measured_at=datetime.fromisoformat(row["measured_at"]),
)
for row in rows
]
def get_measurements_dataframe(self, run_id: UUID) -> Any:
"""Retrieve measurements as pandas DataFrame.
This is a stub - actual implementation will be in Parquet extension (Task 13.3).
"""
# Will be implemented in Task 13.3 with Parquet support
return None