From e6f55c2781a4eaa43ab0f5bf7051277c9a8a40c7 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Tue, 15 Apr 2025 19:06:55 +0000 Subject: [PATCH] sqlite storage for benchmarks Persistent storage for benchmark history with WAL mode for concurrent access. --- src/veritext/benchmark/storage.py | 158 ++++++++++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 src/veritext/benchmark/storage.py diff --git a/src/veritext/benchmark/storage.py b/src/veritext/benchmark/storage.py new file mode 100644 index 0000000..669e10c --- /dev/null +++ b/src/veritext/benchmark/storage.py @@ -0,0 +1,158 @@ +"""SQLite storage for benchmark history.""" + +import json +import sqlite3 +from datetime import datetime +from pathlib import Path + +from veritext.benchmark.models import BenchmarkRun +from veritext.core.exceptions import StorageError + + +class BenchmarkStorage: + """SQLite-backed storage for benchmark runs.""" + + def __init__(self, db_path: Path) -> None: + """ + Initialise storage, creating tables if needed. + + Args: + db_path: Path to the SQLite database file. + """ + self._db_path = db_path + self._db_path.parent.mkdir(parents=True, exist_ok=True) + try: + with self._get_connection() as conn: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS benchmark_runs ( + id TEXT PRIMARY KEY, + benchmark_name TEXT NOT NULL, + timestamp TEXT NOT NULL, + veritext_version TEXT NOT NULL, + sample_count INTEGER NOT NULL, + metadata TEXT + ); + + CREATE TABLE IF NOT EXISTS benchmark_metrics ( + run_id TEXT REFERENCES benchmark_runs(id) ON DELETE CASCADE, + metric_name TEXT NOT NULL, + value REAL NOT NULL, + PRIMARY KEY (run_id, metric_name) + ); + + CREATE INDEX IF NOT EXISTS idx_benchmark_name + ON benchmark_runs(benchmark_name, timestamp DESC); + """) + except sqlite3.Error as e: + raise StorageError(f"Failed to initialise database: {e}") from e + + def _get_connection(self) -> sqlite3.Connection: + conn = sqlite3.connect(str(self._db_path), timeout=30.0) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + conn.row_factory = sqlite3.Row + return conn + + def save_run(self, run: BenchmarkRun) -> None: + """ + Persist a benchmark run. + + Args: + run: The benchmark run to save. + + Raises: + StorageError: If the save operation fails. + """ + try: + with self._get_connection() as conn: + conn.execute( + """ + INSERT INTO benchmark_runs + (id, benchmark_name, timestamp, veritext_version, sample_count, metadata) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + run.id, + run.benchmark_name, + run.timestamp.isoformat(), + run.veritext_version, + run.sample_count, + json.dumps(run.metadata) if run.metadata else None, + ), + ) + + for metric_name, value in run.metrics.items(): + conn.execute( + """ + INSERT INTO benchmark_metrics (run_id, metric_name, value) + VALUES (?, ?, ?) + """, + (run.id, metric_name, value), + ) + except sqlite3.IntegrityError as e: + raise StorageError(f"Run with id '{run.id}' already exists") from e + except sqlite3.Error as e: + raise StorageError(f"Failed to save benchmark run: {e}") from e + + def get_runs( + self, + benchmark_name: str, + limit: int | None = None, + ) -> list[BenchmarkRun]: + """ + Retrieve runs for a benchmark, most recent first. + + Args: + benchmark_name: Name of the benchmark to retrieve runs for. + limit: Maximum number of runs to return. + + Returns: + List of BenchmarkRun objects, most recent first. + + Raises: + StorageError: If the retrieval fails. + """ + try: + with self._get_connection() as conn: + query = """ + SELECT id, benchmark_name, timestamp, veritext_version, + sample_count, metadata + FROM benchmark_runs + WHERE benchmark_name = ? + ORDER BY timestamp DESC + """ + if limit is not None: + query += " LIMIT ?" + rows = conn.execute(query, (benchmark_name, limit)).fetchall() + else: + rows = conn.execute(query, (benchmark_name,)).fetchall() + + runs = [] + for row in rows: + metrics_rows = conn.execute( + "SELECT metric_name, value FROM benchmark_metrics WHERE run_id = ?", + (row["id"],), + ).fetchall() + metrics = {m["metric_name"]: m["value"] for m in metrics_rows} + + metadata = json.loads(row["metadata"]) if row["metadata"] else {} + + runs.append( + BenchmarkRun( + id=row["id"], + benchmark_name=row["benchmark_name"], + timestamp=datetime.fromisoformat(row["timestamp"]), + veritext_version=row["veritext_version"], + sample_count=row["sample_count"], + metrics=metrics, + metadata=metadata, + ) + ) + + return runs + except sqlite3.Error as e: + raise StorageError(f"Failed to retrieve benchmark runs: {e}") from e + + def get_latest_run(self, benchmark_name: str) -> BenchmarkRun | None: + runs = self.get_runs(benchmark_name, limit=1) + return runs[0] if runs else None