223 lines
6.9 KiB
Python
223 lines
6.9 KiB
Python
"""Tests for database models."""
|
|
|
|
from uuid import uuid4
|
|
|
|
from arbiter.db.models import (
|
|
Base,
|
|
ConflictModel,
|
|
DeliberationStepModel,
|
|
FindingModel,
|
|
PolicyModel,
|
|
ReviewModel,
|
|
)
|
|
from arbiter.deliberation.conflicts import ConflictNature
|
|
from arbiter.deliberation.coordinator import StepType
|
|
from arbiter.models.enums import AgentName, Severity, Verdict
|
|
|
|
|
|
class TestReviewModel:
|
|
def test_review_model_creation(self) -> None:
|
|
review = ReviewModel(
|
|
id=str(uuid4()),
|
|
repository="owner/repo",
|
|
pr_number=42,
|
|
pr_title="Test PR",
|
|
base_sha="abc1234567890123456789012345678901234567",
|
|
head_sha="def1234567890123456789012345678901234567",
|
|
author="testuser",
|
|
is_draft=False,
|
|
status="pending",
|
|
)
|
|
|
|
assert review.repository == "owner/repo"
|
|
assert review.pr_number == 42
|
|
assert review.status == "pending"
|
|
assert review.is_draft is False
|
|
|
|
def test_review_model_with_verdict(self) -> None:
|
|
review = ReviewModel(
|
|
id=str(uuid4()),
|
|
repository="owner/repo",
|
|
pr_number=1,
|
|
base_sha="a" * 40,
|
|
head_sha="b" * 40,
|
|
status="completed",
|
|
verdict=Verdict.COMMENT,
|
|
verdict_confidence=0.75,
|
|
verdict_reasoning="Found some issues",
|
|
)
|
|
|
|
assert review.verdict == Verdict.COMMENT
|
|
assert review.verdict_confidence == 0.75
|
|
|
|
def test_review_model_cost_tracking(self) -> None:
|
|
review = ReviewModel(
|
|
id=str(uuid4()),
|
|
repository="owner/repo",
|
|
pr_number=1,
|
|
base_sha="a" * 40,
|
|
head_sha="b" * 40,
|
|
total_tokens=1500,
|
|
total_cost_usd=0.015,
|
|
tokens_by_agent={"security": 500, "style": 500, "complexity": 500},
|
|
cost_by_agent={"security": 0.005, "style": 0.005, "complexity": 0.005},
|
|
)
|
|
|
|
assert review.total_tokens == 1500
|
|
assert review.total_cost_usd == 0.015
|
|
assert review.tokens_by_agent["security"] == 500
|
|
|
|
|
|
class TestFindingModel:
|
|
def test_finding_model_creation(self) -> None:
|
|
finding = FindingModel(
|
|
id=str(uuid4()),
|
|
review_id=str(uuid4()),
|
|
agent=AgentName.SECURITY,
|
|
file="src/auth.py",
|
|
line_start=10,
|
|
line_end=15,
|
|
severity=Severity.HIGH,
|
|
confidence=0.9,
|
|
title="SQL Injection",
|
|
description="User input concatenated in SQL",
|
|
reasoning="String concatenation allows injection",
|
|
suggestion="Use parameterized queries",
|
|
references=["https://owasp.org"],
|
|
prompt_version="security-v1.0",
|
|
)
|
|
|
|
assert finding.agent == AgentName.SECURITY
|
|
assert finding.severity == Severity.HIGH
|
|
assert finding.confidence == 0.9
|
|
assert finding.line_start == 10
|
|
assert finding.line_end == 15
|
|
|
|
|
|
class TestConflictModel:
|
|
def test_conflict_model_creation(self) -> None:
|
|
conflict = ConflictModel(
|
|
id=str(uuid4()),
|
|
review_id=str(uuid4()),
|
|
finding_ids=["finding-1", "finding-2"],
|
|
nature=ConflictNature.TRADE_OFF,
|
|
description="Security vs simplicity trade-off",
|
|
severity_weight=0.7,
|
|
)
|
|
|
|
assert conflict.nature == ConflictNature.TRADE_OFF
|
|
assert len(conflict.finding_ids) == 2
|
|
assert conflict.severity_weight == 0.7
|
|
|
|
def test_conflict_model_with_resolution(self) -> None:
|
|
conflict = ConflictModel(
|
|
id=str(uuid4()),
|
|
review_id=str(uuid4()),
|
|
finding_ids=["finding-1", "finding-2"],
|
|
nature=ConflictNature.CONTRADICTORY,
|
|
description="Opposing recommendations",
|
|
severity_weight=0.8,
|
|
resolution="Security takes precedence",
|
|
winning_finding_id="finding-1",
|
|
)
|
|
|
|
assert conflict.resolution is not None
|
|
assert conflict.winning_finding_id == "finding-1"
|
|
|
|
|
|
class TestDeliberationStepModel:
|
|
def test_deliberation_step_creation(self) -> None:
|
|
step = DeliberationStepModel(
|
|
id=str(uuid4()),
|
|
review_id=str(uuid4()),
|
|
step_type=StepType.MERGE,
|
|
description="Merged 5 findings",
|
|
details={"groups": 3, "unique": 5},
|
|
sequence=0,
|
|
)
|
|
|
|
assert step.step_type == StepType.MERGE
|
|
assert step.sequence == 0
|
|
assert step.details["groups"] == 3
|
|
|
|
def test_all_step_types(self) -> None:
|
|
review_id = str(uuid4())
|
|
|
|
steps = [
|
|
DeliberationStepModel(
|
|
id=str(uuid4()),
|
|
review_id=review_id,
|
|
step_type=StepType.MERGE,
|
|
description="Merge step",
|
|
sequence=0,
|
|
),
|
|
DeliberationStepModel(
|
|
id=str(uuid4()),
|
|
review_id=review_id,
|
|
step_type=StepType.CONFLICT_DETECTION,
|
|
description="Conflict detection step",
|
|
sequence=1,
|
|
),
|
|
DeliberationStepModel(
|
|
id=str(uuid4()),
|
|
review_id=review_id,
|
|
step_type=StepType.SYNTHESIS,
|
|
description="Synthesis step",
|
|
sequence=2,
|
|
),
|
|
DeliberationStepModel(
|
|
id=str(uuid4()),
|
|
review_id=review_id,
|
|
step_type=StepType.VERDICT,
|
|
description="Verdict step",
|
|
sequence=3,
|
|
),
|
|
]
|
|
|
|
assert len(steps) == 4
|
|
assert steps[0].step_type == StepType.MERGE
|
|
assert steps[3].step_type == StepType.VERDICT
|
|
|
|
|
|
class TestPolicyModel:
|
|
def test_policy_model_creation(self) -> None:
|
|
policy = PolicyModel(
|
|
id=str(uuid4()),
|
|
name="default",
|
|
organization="test-org",
|
|
description="Default policy",
|
|
is_active=True,
|
|
)
|
|
|
|
assert policy.name == "default"
|
|
assert policy.organization == "test-org"
|
|
assert policy.is_active is True
|
|
|
|
def test_policy_model_with_config(self) -> None:
|
|
policy = PolicyModel(
|
|
id=str(uuid4()),
|
|
name="strict",
|
|
agents_config={
|
|
"security": {"enabled": True, "model": "gpt-4o"},
|
|
"style": {"enabled": True},
|
|
"complexity": {"enabled": False},
|
|
},
|
|
cost_controls={
|
|
"max_tokens": 50000,
|
|
"max_cost_usd": 0.50,
|
|
},
|
|
verdict_thresholds={
|
|
"critical_threshold": 1,
|
|
"high_threshold": 3,
|
|
},
|
|
)
|
|
|
|
assert policy.agents_config["security"]["model"] == "gpt-4o"
|
|
assert policy.cost_controls["max_tokens"] == 50000
|
|
|
|
|
|
class TestBase:
|
|
def test_base_is_declarative_base(self) -> None:
|
|
assert hasattr(Base, "metadata")
|
|
assert hasattr(Base, "registry")
|