Implement SQLite repository
This commit is contained in:
313
src/py_dvt_ate/data/repository.py
Normal file
313
src/py_dvt_ate/data/repository.py
Normal file
@@ -0,0 +1,313 @@
|
||||
"""Data repository implementation using SQLite.
|
||||
|
||||
This module provides SQLite-based storage for test run metadata and results.
|
||||
Time-series measurements are stored separately in Parquet files.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus
|
||||
|
||||
|
||||
class ITestRepository(ABC):
|
||||
"""Repository interface for test data."""
|
||||
|
||||
@abstractmethod
|
||||
def create_run(
|
||||
self,
|
||||
test_name: str,
|
||||
config: dict[str, Any],
|
||||
operator: str | None = None,
|
||||
description: str | None = None,
|
||||
) -> UUID:
|
||||
"""Create a new test run and return its ID."""
|
||||
|
||||
@abstractmethod
|
||||
def update_run_status(self, run_id: UUID, status: TestStatus) -> None:
|
||||
"""Update the status of a test run."""
|
||||
|
||||
@abstractmethod
|
||||
def complete_run(self, run_id: UUID, status: TestStatus) -> None:
|
||||
"""Mark a test run as complete with final status."""
|
||||
|
||||
@abstractmethod
|
||||
def save_result(
|
||||
self,
|
||||
run_id: UUID,
|
||||
parameter: str,
|
||||
value: float,
|
||||
unit: str,
|
||||
lower_limit: float | None = None,
|
||||
upper_limit: float | None = None,
|
||||
) -> None:
|
||||
"""Save a scalar test result."""
|
||||
|
||||
@abstractmethod
|
||||
def save_measurements(
|
||||
self,
|
||||
run_id: UUID,
|
||||
measurements: list[Measurement],
|
||||
) -> None:
|
||||
"""Save time-series measurements (implemented in Parquet extension)."""
|
||||
|
||||
@abstractmethod
|
||||
def get_run(self, run_id: UUID) -> TestRun:
|
||||
"""Retrieve test run metadata by ID."""
|
||||
|
||||
@abstractmethod
|
||||
def get_results(self, run_id: UUID) -> list[TestResult]:
|
||||
"""Retrieve all test results for a run."""
|
||||
|
||||
@abstractmethod
|
||||
def get_measurements_dataframe(self, run_id: UUID) -> Any:
|
||||
"""Retrieve measurements as pandas DataFrame (implemented in Parquet extension)."""
|
||||
|
||||
|
||||
class SQLiteRepository(ITestRepository):
|
||||
"""SQLite-based repository for test data.
|
||||
|
||||
Stores test run metadata and scalar results in SQLite.
|
||||
Time-series measurements are delegated to Parquet files.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: str | Path):
|
||||
"""Initialise repository with database path.
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database file
|
||||
"""
|
||||
self.db_path = Path(db_path)
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._init_database()
|
||||
|
||||
def _init_database(self) -> None:
|
||||
"""Create database tables if they don't exist."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS test_runs (
|
||||
id TEXT PRIMARY KEY,
|
||||
test_name TEXT NOT NULL,
|
||||
description TEXT,
|
||||
started_at TEXT NOT NULL,
|
||||
completed_at TEXT,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
config_json TEXT NOT NULL,
|
||||
operator TEXT,
|
||||
notes TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS test_results (
|
||||
id TEXT PRIMARY KEY,
|
||||
test_run_id TEXT NOT NULL,
|
||||
parameter TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
unit TEXT,
|
||||
lower_limit REAL,
|
||||
upper_limit REAL,
|
||||
passed INTEGER NOT NULL,
|
||||
measured_at TEXT NOT NULL,
|
||||
FOREIGN KEY (test_run_id) REFERENCES test_runs(id)
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_test_runs_status ON test_runs(status)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_test_runs_name ON test_runs(test_name)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_test_results_run ON test_results(test_run_id)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_test_results_param ON test_results(parameter)"
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def create_run(
|
||||
self,
|
||||
test_name: str,
|
||||
config: dict[str, Any],
|
||||
operator: str | None = None,
|
||||
description: str | None = None,
|
||||
) -> UUID:
|
||||
"""Create a new test run and return its ID."""
|
||||
run_id = uuid4()
|
||||
started_at = datetime.now()
|
||||
config_json = json.dumps(config)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO test_runs (
|
||||
id, test_name, description, started_at, status,
|
||||
config_json, operator, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(run_id),
|
||||
test_name,
|
||||
description,
|
||||
started_at.isoformat(),
|
||||
TestStatus.PENDING.value,
|
||||
config_json,
|
||||
operator,
|
||||
datetime.now().isoformat(),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
return run_id
|
||||
|
||||
def update_run_status(self, run_id: UUID, status: TestStatus) -> None:
|
||||
"""Update the status of a test run."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"UPDATE test_runs SET status = ? WHERE id = ?",
|
||||
(status.value, str(run_id)),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def complete_run(self, run_id: UUID, status: TestStatus) -> None:
|
||||
"""Mark a test run as complete with final status."""
|
||||
completed_at = datetime.now()
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE test_runs
|
||||
SET status = ?, completed_at = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(status.value, completed_at.isoformat(), str(run_id)),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def save_result(
|
||||
self,
|
||||
run_id: UUID,
|
||||
parameter: str,
|
||||
value: float,
|
||||
unit: str,
|
||||
lower_limit: float | None = None,
|
||||
upper_limit: float | None = None,
|
||||
) -> None:
|
||||
"""Save a scalar test result."""
|
||||
result_id = uuid4()
|
||||
measured_at = datetime.now()
|
||||
|
||||
# Calculate pass/fail
|
||||
passed = 1 # Default to pass if no limits
|
||||
if lower_limit is not None or upper_limit is not None:
|
||||
lower_ok = lower_limit is None or value >= lower_limit
|
||||
upper_ok = upper_limit is None or value <= upper_limit
|
||||
passed = 1 if (lower_ok and upper_ok) else 0
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO test_results (
|
||||
id, test_run_id, parameter, value, unit,
|
||||
lower_limit, upper_limit, passed, measured_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(result_id),
|
||||
str(run_id),
|
||||
parameter,
|
||||
value,
|
||||
unit,
|
||||
lower_limit,
|
||||
upper_limit,
|
||||
passed,
|
||||
measured_at.isoformat(),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def save_measurements(
|
||||
self,
|
||||
run_id: UUID,
|
||||
measurements: list[Measurement],
|
||||
) -> None:
|
||||
"""Save time-series measurements.
|
||||
|
||||
This is a stub - actual implementation will be in Parquet extension (Task 13.3).
|
||||
"""
|
||||
# Will be implemented in Task 13.3 with Parquet support
|
||||
|
||||
def get_run(self, run_id: UUID) -> TestRun:
|
||||
"""Retrieve test run metadata by ID."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"SELECT * FROM test_runs WHERE id = ?",
|
||||
(str(run_id),),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
msg = f"Test run {run_id} not found"
|
||||
raise ValueError(msg)
|
||||
|
||||
return TestRun(
|
||||
id=row["id"],
|
||||
test_name=row["test_name"],
|
||||
description=row["description"],
|
||||
started_at=datetime.fromisoformat(row["started_at"]),
|
||||
completed_at=(
|
||||
datetime.fromisoformat(row["completed_at"])
|
||||
if row["completed_at"]
|
||||
else None
|
||||
),
|
||||
status=TestStatus(row["status"]),
|
||||
config_json=row["config_json"],
|
||||
operator=row["operator"],
|
||||
notes=row["notes"],
|
||||
created_at=datetime.fromisoformat(row["created_at"]),
|
||||
)
|
||||
|
||||
def get_results(self, run_id: UUID) -> list[TestResult]:
|
||||
"""Retrieve all test results for a run."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"SELECT * FROM test_results WHERE test_run_id = ?",
|
||||
(str(run_id),),
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
return [
|
||||
TestResult(
|
||||
id=row["id"],
|
||||
test_run_id=row["test_run_id"],
|
||||
parameter=row["parameter"],
|
||||
value=row["value"],
|
||||
unit=row["unit"],
|
||||
lower_limit=row["lower_limit"],
|
||||
upper_limit=row["upper_limit"],
|
||||
measured_at=datetime.fromisoformat(row["measured_at"]),
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def get_measurements_dataframe(self, run_id: UUID) -> Any:
|
||||
"""Retrieve measurements as pandas DataFrame.
|
||||
|
||||
This is a stub - actual implementation will be in Parquet extension (Task 13.3).
|
||||
"""
|
||||
# Will be implemented in Task 13.3 with Parquet support
|
||||
return None
|
||||
Reference in New Issue
Block a user