sqlite storage for benchmarks
Persistent storage for benchmark history with WAL mode for concurrent access.
This commit is contained in:
158
src/veritext/benchmark/storage.py
Normal file
158
src/veritext/benchmark/storage.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""SQLite storage for benchmark history."""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from veritext.benchmark.models import BenchmarkRun
|
||||
from veritext.core.exceptions import StorageError
|
||||
|
||||
|
||||
class BenchmarkStorage:
|
||||
"""SQLite-backed storage for benchmark runs."""
|
||||
|
||||
def __init__(self, db_path: Path) -> None:
|
||||
"""
|
||||
Initialise storage, creating tables if needed.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database file.
|
||||
"""
|
||||
self._db_path = db_path
|
||||
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
with self._get_connection() as conn:
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS benchmark_runs (
|
||||
id TEXT PRIMARY KEY,
|
||||
benchmark_name TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
veritext_version TEXT NOT NULL,
|
||||
sample_count INTEGER NOT NULL,
|
||||
metadata TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS benchmark_metrics (
|
||||
run_id TEXT REFERENCES benchmark_runs(id) ON DELETE CASCADE,
|
||||
metric_name TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
PRIMARY KEY (run_id, metric_name)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_benchmark_name
|
||||
ON benchmark_runs(benchmark_name, timestamp DESC);
|
||||
""")
|
||||
except sqlite3.Error as e:
|
||||
raise StorageError(f"Failed to initialise database: {e}") from e
|
||||
|
||||
def _get_connection(self) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(str(self._db_path), timeout=30.0)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
def save_run(self, run: BenchmarkRun) -> None:
|
||||
"""
|
||||
Persist a benchmark run.
|
||||
|
||||
Args:
|
||||
run: The benchmark run to save.
|
||||
|
||||
Raises:
|
||||
StorageError: If the save operation fails.
|
||||
"""
|
||||
try:
|
||||
with self._get_connection() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO benchmark_runs
|
||||
(id, benchmark_name, timestamp, veritext_version, sample_count, metadata)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
run.id,
|
||||
run.benchmark_name,
|
||||
run.timestamp.isoformat(),
|
||||
run.veritext_version,
|
||||
run.sample_count,
|
||||
json.dumps(run.metadata) if run.metadata else None,
|
||||
),
|
||||
)
|
||||
|
||||
for metric_name, value in run.metrics.items():
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO benchmark_metrics (run_id, metric_name, value)
|
||||
VALUES (?, ?, ?)
|
||||
""",
|
||||
(run.id, metric_name, value),
|
||||
)
|
||||
except sqlite3.IntegrityError as e:
|
||||
raise StorageError(f"Run with id '{run.id}' already exists") from e
|
||||
except sqlite3.Error as e:
|
||||
raise StorageError(f"Failed to save benchmark run: {e}") from e
|
||||
|
||||
def get_runs(
|
||||
self,
|
||||
benchmark_name: str,
|
||||
limit: int | None = None,
|
||||
) -> list[BenchmarkRun]:
|
||||
"""
|
||||
Retrieve runs for a benchmark, most recent first.
|
||||
|
||||
Args:
|
||||
benchmark_name: Name of the benchmark to retrieve runs for.
|
||||
limit: Maximum number of runs to return.
|
||||
|
||||
Returns:
|
||||
List of BenchmarkRun objects, most recent first.
|
||||
|
||||
Raises:
|
||||
StorageError: If the retrieval fails.
|
||||
"""
|
||||
try:
|
||||
with self._get_connection() as conn:
|
||||
query = """
|
||||
SELECT id, benchmark_name, timestamp, veritext_version,
|
||||
sample_count, metadata
|
||||
FROM benchmark_runs
|
||||
WHERE benchmark_name = ?
|
||||
ORDER BY timestamp DESC
|
||||
"""
|
||||
if limit is not None:
|
||||
query += " LIMIT ?"
|
||||
rows = conn.execute(query, (benchmark_name, limit)).fetchall()
|
||||
else:
|
||||
rows = conn.execute(query, (benchmark_name,)).fetchall()
|
||||
|
||||
runs = []
|
||||
for row in rows:
|
||||
metrics_rows = conn.execute(
|
||||
"SELECT metric_name, value FROM benchmark_metrics WHERE run_id = ?",
|
||||
(row["id"],),
|
||||
).fetchall()
|
||||
metrics = {m["metric_name"]: m["value"] for m in metrics_rows}
|
||||
|
||||
metadata = json.loads(row["metadata"]) if row["metadata"] else {}
|
||||
|
||||
runs.append(
|
||||
BenchmarkRun(
|
||||
id=row["id"],
|
||||
benchmark_name=row["benchmark_name"],
|
||||
timestamp=datetime.fromisoformat(row["timestamp"]),
|
||||
veritext_version=row["veritext_version"],
|
||||
sample_count=row["sample_count"],
|
||||
metrics=metrics,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
|
||||
return runs
|
||||
except sqlite3.Error as e:
|
||||
raise StorageError(f"Failed to retrieve benchmark runs: {e}") from e
|
||||
|
||||
def get_latest_run(self, benchmark_name: str) -> BenchmarkRun | None:
|
||||
runs = self.get_runs(benchmark_name, limit=1)
|
||||
return runs[0] if runs else None
|
||||
Reference in New Issue
Block a user