"""Tests for benchmark SQLite storage.""" import sqlite3 import threading from datetime import UTC, datetime from pathlib import Path import pytest from veritext.benchmark.models import BenchmarkRun from veritext.benchmark.storage import BenchmarkStorage from veritext.core.exceptions import StorageError @pytest.fixture def db_path(tmp_path: Path) -> Path: return tmp_path / "benchmarks" / "test.db" @pytest.fixture def storage(db_path: Path) -> BenchmarkStorage: return BenchmarkStorage(db_path) @pytest.fixture def sample_run() -> BenchmarkRun: return BenchmarkRun( id="run-001", benchmark_name="test-suite", timestamp=datetime(2025, 1, 15, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0-dev", metrics={"bleu4": 0.75, "rouge_l": 0.82}, sample_count=100, metadata={"git_sha": "abc123"}, ) class TestDatabaseCreation: def test_creates_database_file(self, db_path: Path) -> None: assert not db_path.exists() BenchmarkStorage(db_path) assert db_path.exists() def test_creates_parent_directories(self, tmp_path: Path) -> None: nested_path = tmp_path / "deep" / "nested" / "path" / "test.db" BenchmarkStorage(nested_path) assert nested_path.exists() def test_creates_tables(self, db_path: Path) -> None: BenchmarkStorage(db_path) conn = sqlite3.connect(str(db_path)) cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = {row[0] for row in cursor.fetchall()} conn.close() assert "benchmark_runs" in tables assert "benchmark_metrics" in tables def test_creates_index(self, db_path: Path) -> None: BenchmarkStorage(db_path) conn = sqlite3.connect(str(db_path)) cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='index'") indices = {row[0] for row in cursor.fetchall()} conn.close() assert "idx_benchmark_name" in indices class TestSaveRun: def test_save_run( self, storage: BenchmarkStorage, sample_run: BenchmarkRun ) -> None: storage.save_run(sample_run) runs = storage.get_runs("test-suite") assert len(runs) == 1 assert runs[0].id == "run-001" def test_save_preserves_all_fields( self, storage: BenchmarkStorage, sample_run: BenchmarkRun ) -> None: storage.save_run(sample_run) runs = storage.get_runs("test-suite") run = runs[0] assert run.id == sample_run.id assert run.benchmark_name == sample_run.benchmark_name assert run.timestamp == sample_run.timestamp assert run.veritext_version == sample_run.veritext_version assert run.metrics == sample_run.metrics assert run.sample_count == sample_run.sample_count assert run.metadata == sample_run.metadata def test_save_duplicate_id_raises( self, storage: BenchmarkStorage, sample_run: BenchmarkRun ) -> None: storage.save_run(sample_run) with pytest.raises(StorageError, match="already exists"): storage.save_run(sample_run) def test_save_run_empty_metadata(self, storage: BenchmarkStorage) -> None: run = BenchmarkRun( id="run-no-meta", benchmark_name="test-suite", timestamp=datetime(2025, 1, 15, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0-dev", metrics={"bleu4": 0.5}, sample_count=10, ) storage.save_run(run) retrieved = storage.get_latest_run("test-suite") assert retrieved is not None assert retrieved.metadata == {} class TestGetRuns: def test_get_runs_empty_database(self, storage: BenchmarkStorage) -> None: runs = storage.get_runs("nonexistent") assert runs == [] def test_get_runs_filters_by_name(self, storage: BenchmarkStorage) -> None: run1 = BenchmarkRun( id="run-1", benchmark_name="suite-a", timestamp=datetime(2025, 1, 15, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0", metrics={"bleu4": 0.5}, sample_count=10, ) run2 = BenchmarkRun( id="run-2", benchmark_name="suite-b", timestamp=datetime(2025, 1, 15, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0", metrics={"bleu4": 0.6}, sample_count=10, ) storage.save_run(run1) storage.save_run(run2) runs_a = storage.get_runs("suite-a") runs_b = storage.get_runs("suite-b") assert len(runs_a) == 1 assert runs_a[0].id == "run-1" assert len(runs_b) == 1 assert runs_b[0].id == "run-2" def test_get_runs_ordered_by_timestamp(self, storage: BenchmarkStorage) -> None: run_old = BenchmarkRun( id="run-old", benchmark_name="test", timestamp=datetime(2025, 1, 10, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0", metrics={"bleu4": 0.5}, sample_count=10, ) run_new = BenchmarkRun( id="run-new", benchmark_name="test", timestamp=datetime(2025, 1, 20, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0", metrics={"bleu4": 0.6}, sample_count=10, ) # Save in reverse order storage.save_run(run_new) storage.save_run(run_old) runs = storage.get_runs("test") assert runs[0].id == "run-new" assert runs[1].id == "run-old" def test_get_runs_with_limit(self, storage: BenchmarkStorage) -> None: for i in range(5): run = BenchmarkRun( id=f"run-{i}", benchmark_name="test", timestamp=datetime(2025, 1, i + 1, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0", metrics={"bleu4": 0.5 + i * 0.1}, sample_count=10, ) storage.save_run(run) runs = storage.get_runs("test", limit=3) assert len(runs) == 3 class TestGetLatestRun: def test_get_latest_run_empty(self, storage: BenchmarkStorage) -> None: result = storage.get_latest_run("nonexistent") assert result is None def test_get_latest_run(self, storage: BenchmarkStorage) -> None: run_old = BenchmarkRun( id="run-old", benchmark_name="test", timestamp=datetime(2025, 1, 10, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0", metrics={"bleu4": 0.5}, sample_count=10, ) run_new = BenchmarkRun( id="run-new", benchmark_name="test", timestamp=datetime(2025, 1, 20, 12, 0, 0, tzinfo=UTC), veritext_version="0.1.0", metrics={"bleu4": 0.6}, sample_count=10, ) storage.save_run(run_old) storage.save_run(run_new) latest = storage.get_latest_run("test") assert latest is not None assert latest.id == "run-new" class TestConcurrentAccess: def test_concurrent_writes(self, db_path: Path) -> None: errors: list[Exception] = [] def write_run(run_id: int) -> None: try: storage = BenchmarkStorage(db_path) run = BenchmarkRun( id=f"run-{run_id}", benchmark_name="test", timestamp=datetime(2025, 1, 15, 12, 0, run_id, tzinfo=UTC), veritext_version="0.1.0", metrics={"bleu4": 0.5}, sample_count=10, ) storage.save_run(run) except Exception as e: errors.append(e) threads = [threading.Thread(target=write_run, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent writes failed: {errors}" storage = BenchmarkStorage(db_path) runs = storage.get_runs("test") assert len(runs) == 10 def test_wal_mode_enabled(self, db_path: Path) -> None: BenchmarkStorage(db_path) conn = sqlite3.connect(str(db_path)) cursor = conn.execute("PRAGMA journal_mode") mode = cursor.fetchone()[0] conn.close() assert mode.lower() == "wal"