"""Tests for deliberation module.""" import pytest from arbiter.deliberation.conflicts import Conflict, ConflictDetector, ConflictNature from arbiter.deliberation.coordinator import Coordinator, StepType from arbiter.deliberation.merger import FindingGroup, FindingMerger from arbiter.deliberation.synthesis import ConflictSynthesizer from arbiter.models import AgentName, Finding, ReviewResult, Severity, Verdict from .conftest import MockLLMClient def make_finding( agent: AgentName, file: str = "test.py", line_start: int = 10, line_end: int = 15, severity: Severity = Severity.MEDIUM, confidence: float = 0.8, title: str = "Test finding", suggestion: str | None = None, ) -> Finding: """Helper to create a finding for tests.""" return Finding( id=f"{agent.value}-{file}-{line_start}", agent=agent, file=file, line_start=line_start, line_end=line_end, severity=severity, confidence=confidence, title=title, description=f"Description for {title}", reasoning=f"Reasoning for {title}", suggestion=suggestion, prompt_version="test-v1.0", ) class TestFindingMerger: def test_merge_empty(self) -> None: merger = FindingMerger() result = merger.merge([], None) assert result.unique_findings == [] assert result.groups == [] assert result.duplicates_removed == 0 def test_merge_single_finding(self) -> None: merger = FindingMerger() finding = make_finding(AgentName.SECURITY) result = merger.merge([finding], None) assert len(result.unique_findings) == 1 assert len(result.groups) == 1 assert result.groups[0].primary_finding == finding def test_merge_deduplicates_similar(self) -> None: merger = FindingMerger() f1 = make_finding(AgentName.SECURITY, title="SQL Injection") f2 = make_finding(AgentName.STYLE, title="SQL Injection vulnerability") result = merger.merge([f1, f2], None) assert result.duplicates_removed == 1 assert len(result.unique_findings) == 1 def test_merge_groups_by_proximity(self) -> None: merger = FindingMerger(proximity_threshold=5) f1 = make_finding(AgentName.SECURITY, line_start=10, line_end=12) f2 = make_finding(AgentName.STYLE, line_start=14, line_end=16) f3 = make_finding(AgentName.COMPLEXITY, line_start=50, line_end=55) result = merger.merge([f1, f2, f3], None) assert len(result.groups) == 2 # f1+f2 in one group, f3 alone assert len(result.groups[0].findings) == 2 assert len(result.groups[1].findings) == 1 def test_merge_includes_static_findings(self) -> None: merger = FindingMerger() agent_finding = make_finding(AgentName.SECURITY) static_finding = make_finding( AgentName.STYLE, title="[ruff] E501", line_start=100, ) result = merger.merge([agent_finding], [static_finding]) assert len(result.unique_findings) == 2 assert len(result.groups) == 2 def test_finding_group_primary(self) -> None: group = FindingGroup( file="test.py", line_start=10, line_end=20, findings=[ make_finding(AgentName.STYLE, severity=Severity.LOW), make_finding(AgentName.SECURITY, severity=Severity.HIGH), make_finding(AgentName.COMPLEXITY, severity=Severity.MEDIUM), ], ) primary = group.primary_finding assert primary is not None assert primary.severity == Severity.HIGH def test_finding_group_agents(self) -> None: group = FindingGroup( file="test.py", line_start=10, line_end=20, findings=[ make_finding(AgentName.SECURITY), make_finding(AgentName.STYLE), ], ) agents = group.agents assert len(agents) == 2 assert AgentName.SECURITY in agents assert AgentName.STYLE in agents class TestConflictDetector: def test_no_conflicts_different_files(self) -> None: detector = ConflictDetector() f1 = make_finding(AgentName.SECURITY, file="a.py") f2 = make_finding(AgentName.STYLE, file="b.py") conflicts = detector.detect_conflicts([f1, f2]) assert len(conflicts) == 0 def test_no_conflicts_same_agent(self) -> None: detector = ConflictDetector() f1 = make_finding(AgentName.SECURITY, line_start=10) f2 = make_finding(AgentName.SECURITY, line_start=12) conflicts = detector.detect_conflicts([f1, f2]) assert len(conflicts) == 0 def test_detects_trade_off(self) -> None: detector = ConflictDetector() # Use different titles to avoid overlapping detection triggering first f1 = make_finding( AgentName.SECURITY, severity=Severity.HIGH, title="SQL injection vulnerability" ) f2 = make_finding( AgentName.COMPLEXITY, severity=Severity.MEDIUM, title="Function too complex" ) conflicts = detector.detect_conflicts([f1, f2]) assert len(conflicts) == 1 assert conflicts[0].nature == ConflictNature.TRADE_OFF assert "security" in conflicts[0].description.lower() assert "complexity" in conflicts[0].description.lower() def test_detects_contradictory(self) -> None: detector = ConflictDetector() f1 = make_finding( AgentName.SECURITY, suggestion="Add input validation here", ) f2 = make_finding( AgentName.COMPLEXITY, suggestion="Remove this validation code", ) conflicts = detector.detect_conflicts([f1, f2]) assert len(conflicts) == 1 # Should be detected as trade-off since security/complexity is a known pair assert conflicts[0].nature in (ConflictNature.CONTRADICTORY, ConflictNature.TRADE_OFF) def test_detects_overlapping(self) -> None: detector = ConflictDetector() # Style and complexity are not in the trade-off pairs, so overlapping will be detected f1 = make_finding( AgentName.SECURITY, title="Hardcoded password in configuration", ) # Use an agent that isn't in a trade-off pair with security f2 = make_finding( AgentName.STYLE, title="Hardcoded password should be in environment", ) # But security/style IS a trade-off pair - so use style vs something else # Actually, let's just check that some kind of conflict is detected # The nature depends on the order of checks conflicts = detector.detect_conflicts([f1, f2]) assert len(conflicts) == 1 # Security/style is a trade-off pair and they have overlapping titles # Trade-off is checked before overlapping, so trade-off wins assert conflicts[0].nature in (ConflictNature.TRADE_OFF, ConflictNature.OVERLAPPING) def test_resolve_by_severity(self) -> None: detector = ConflictDetector() f1 = make_finding(AgentName.SECURITY, severity=Severity.HIGH) f2 = make_finding(AgentName.COMPLEXITY, severity=Severity.MEDIUM) conflicts = detector.detect_conflicts([f1, f2]) resolved = detector.resolve_by_severity(conflicts[0], [f1, f2]) assert resolved.winning_finding_id == f1.id assert "severity" in resolved.resolution.lower() class TestConflictSynthesizer: @pytest.mark.asyncio async def test_synthesize_returns_resolution(self) -> None: mock_response = """{ "decision": "prefer_first", "reasoning": "Security takes priority over complexity", "merged_suggestion": null, "confidence": 0.85 }""" mock_llm = MockLLMClient(responses=[mock_response]) synthesizer = ConflictSynthesizer(mock_llm) f1 = make_finding(AgentName.SECURITY, severity=Severity.HIGH) f2 = make_finding(AgentName.COMPLEXITY, severity=Severity.MEDIUM) conflict = Conflict( id="test-conflict", finding_ids=[f1.id, f2.id], nature=ConflictNature.TRADE_OFF, description="Test conflict", severity_weight=0.8, ) resolution = await synthesizer.synthesize(conflict, [f1, f2]) assert resolution.decision == "prefer_first" assert resolution.confidence == 0.85 assert "security" in resolution.reasoning.lower() @pytest.mark.asyncio async def test_synthesize_handles_invalid_json(self) -> None: mock_llm = MockLLMClient(responses=["not valid json"]) synthesizer = ConflictSynthesizer(mock_llm) f1 = make_finding(AgentName.SECURITY, severity=Severity.HIGH) f2 = make_finding(AgentName.COMPLEXITY, severity=Severity.LOW) conflict = Conflict( id="test-conflict", finding_ids=[f1.id, f2.id], nature=ConflictNature.TRADE_OFF, description="Test conflict", severity_weight=0.8, ) resolution = await synthesizer.synthesize(conflict, [f1, f2]) # Should fall back to severity-based resolution assert resolution.decision == "prefer_first" assert "fallback" in resolution.reasoning.lower() def test_should_synthesize_contradictory(self) -> None: synthesizer = ConflictSynthesizer(MockLLMClient()) conflict = Conflict( id="test", finding_ids=["a", "b"], nature=ConflictNature.CONTRADICTORY, description="Test", severity_weight=0.5, ) assert synthesizer.should_synthesize(conflict) is True def test_should_not_synthesize_overlapping(self) -> None: synthesizer = ConflictSynthesizer(MockLLMClient()) conflict = Conflict( id="test", finding_ids=["a", "b"], nature=ConflictNature.OVERLAPPING, description="Test", severity_weight=0.5, ) assert synthesizer.should_synthesize(conflict) is False class TestCoordinator: @pytest.mark.asyncio async def test_deliberate_empty_results(self) -> None: coordinator = Coordinator() result = await coordinator.deliberate([], None) assert result.verdict == Verdict.APPROVE assert result.total_findings == 0 assert len(result.steps) > 0 @pytest.mark.asyncio async def test_deliberate_merges_findings(self) -> None: coordinator = Coordinator() results = [ ReviewResult( agent_name=AgentName.SECURITY, findings=[make_finding(AgentName.SECURITY)], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ReviewResult( agent_name=AgentName.STYLE, findings=[make_finding(AgentName.STYLE, line_start=50)], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) assert result.total_findings == 2 assert len(result.merged.groups) == 2 assert any(s.step_type == StepType.MERGE for s in result.steps) @pytest.mark.asyncio async def test_deliberate_detects_conflicts(self) -> None: coordinator = Coordinator() # Create findings at same location from different agents with different titles results = [ ReviewResult( agent_name=AgentName.SECURITY, findings=[ make_finding( AgentName.SECURITY, severity=Severity.HIGH, title="SQL injection risk" ) ], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ReviewResult( agent_name=AgentName.COMPLEXITY, findings=[ make_finding( AgentName.COMPLEXITY, severity=Severity.MEDIUM, title="Overly complex function", ) ], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) assert len(result.conflicts) > 0 assert any(s.step_type == StepType.CONFLICT_DETECTION for s in result.steps) @pytest.mark.asyncio async def test_verdict_critical_requests_changes(self) -> None: coordinator = Coordinator() results = [ ReviewResult( agent_name=AgentName.SECURITY, findings=[make_finding(AgentName.SECURITY, severity=Severity.CRITICAL)], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) assert result.verdict == Verdict.REQUEST_CHANGES assert result.critical_count == 1 @pytest.mark.asyncio async def test_verdict_multiple_high_requests_changes(self) -> None: coordinator = Coordinator() results = [ ReviewResult( agent_name=AgentName.SECURITY, findings=[ make_finding(AgentName.SECURITY, severity=Severity.HIGH, line_start=10), make_finding(AgentName.SECURITY, severity=Severity.HIGH, line_start=20), make_finding(AgentName.SECURITY, severity=Severity.HIGH, line_start=30), ], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) assert result.verdict == Verdict.REQUEST_CHANGES assert result.high_count == 3 @pytest.mark.asyncio async def test_verdict_low_severity_approves(self) -> None: coordinator = Coordinator() results = [ ReviewResult( agent_name=AgentName.STYLE, findings=[ make_finding(AgentName.STYLE, severity=Severity.LOW, line_start=10), make_finding(AgentName.STYLE, severity=Severity.INFO, line_start=20), ], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) assert result.verdict == Verdict.APPROVE @pytest.mark.asyncio async def test_deliberation_steps_logged(self) -> None: coordinator = Coordinator() results = [ ReviewResult( agent_name=AgentName.SECURITY, findings=[make_finding(AgentName.SECURITY)], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) step_types = [s.step_type for s in result.steps] assert StepType.MERGE in step_types assert StepType.CONFLICT_DETECTION in step_types assert StepType.VERDICT in step_types @pytest.mark.asyncio async def test_verdict_medium_count_comments(self) -> None: coordinator = Coordinator() results = [ ReviewResult( agent_name=AgentName.STYLE, findings=[ make_finding( AgentName.STYLE, severity=Severity.MEDIUM, line_start=(i + 1) * 10, title=f"Issue {i}", ) for i in range(5) ], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) assert result.verdict == Verdict.COMMENT assert "medium" in result.verdict_reasoning.lower() @pytest.mark.asyncio async def test_verdict_single_high_comments(self) -> None: coordinator = Coordinator() results = [ ReviewResult( agent_name=AgentName.SECURITY, findings=[ make_finding(AgentName.SECURITY, severity=Severity.HIGH), ], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) assert result.verdict == Verdict.COMMENT assert result.high_count == 1 @pytest.mark.asyncio async def test_deliberate_with_synthesis(self) -> None: mock_response = """{ "decision": "prefer_first", "reasoning": "Security takes priority", "merged_suggestion": null, "confidence": 0.85 }""" mock_llm = MockLLMClient(responses=[mock_response]) coordinator = Coordinator(llm_client=mock_llm) # Create findings at same location from different agents results = [ ReviewResult( agent_name=AgentName.SECURITY, findings=[ make_finding( AgentName.SECURITY, severity=Severity.HIGH, title="Security vulnerability", suggestion="Add validation", ) ], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ReviewResult( agent_name=AgentName.COMPLEXITY, findings=[ make_finding( AgentName.COMPLEXITY, severity=Severity.MEDIUM, title="Complex function", suggestion="Remove validation", ) ], duration_ms=100, tokens_used=1000, cost_usd=0.01, ), ] result = await coordinator.deliberate(results) assert len(result.conflicts) > 0 # Synthesis step should be logged assert any(s.step_type == StepType.SYNTHESIS for s in result.steps) class TestConflictDetectorEdgeCases: def test_no_conflicts_with_no_overlap(self) -> None: detector = ConflictDetector() f1 = make_finding(AgentName.SECURITY, line_start=10, line_end=15) f2 = make_finding(AgentName.STYLE, line_start=100, line_end=105) conflicts = detector.detect_conflicts([f1, f2]) assert len(conflicts) == 0 def test_overlap_no_title_match(self) -> None: detector = ConflictDetector() # These agents are in TRADE_OFF_PAIRS, so will be detected as trade-off f1 = make_finding( AgentName.SECURITY, title="Unique security title", ) f2 = make_finding( AgentName.STYLE, title="Completely different style concern", ) conflicts = detector.detect_conflicts([f1, f2]) assert len(conflicts) == 1 # Security/Style is a trade-off pair assert conflicts[0].nature == ConflictNature.TRADE_OFF def test_resolve_empty_findings(self) -> None: detector = ConflictDetector() conflict = Conflict( id="test", finding_ids=["nonexistent1", "nonexistent2"], nature=ConflictNature.TRADE_OFF, description="Test", severity_weight=0.5, ) resolved = detector.resolve_by_severity(conflict, []) assert resolved.winning_finding_id is None class TestConflictSynthesizerEdgeCases: @pytest.mark.asyncio async def test_synthesize_missing_findings(self) -> None: mock_llm = MockLLMClient() synthesizer = ConflictSynthesizer(mock_llm) conflict = Conflict( id="test", finding_ids=["nonexistent1", "nonexistent2"], nature=ConflictNature.CONTRADICTORY, description="Test", severity_weight=0.8, ) resolution = await synthesizer.synthesize(conflict, []) assert resolution.decision == "keep_both" assert "Could not find" in resolution.reasoning def test_synthesize_low_severity(self) -> None: synthesizer = ConflictSynthesizer(MockLLMClient()) conflict = Conflict( id="test", finding_ids=["a", "b"], nature=ConflictNature.TRADE_OFF, description="Test", severity_weight=0.5, # Below 0.7 threshold ) assert synthesizer.should_synthesize(conflict) is False def test_synthesize_high_severity(self) -> None: synthesizer = ConflictSynthesizer(MockLLMClient()) conflict = Conflict( id="test", finding_ids=["a", "b"], nature=ConflictNature.TRADE_OFF, description="Test", severity_weight=0.8, # Above 0.7 threshold ) assert synthesizer.should_synthesize(conflict) is True @pytest.mark.asyncio async def test_synthesize_fallback_prefer_second(self) -> None: mock_llm = MockLLMClient(responses=["not valid json"]) synthesizer = ConflictSynthesizer(mock_llm) f1 = make_finding(AgentName.STYLE, severity=Severity.LOW) f2 = make_finding(AgentName.SECURITY, severity=Severity.HIGH) conflict = Conflict( id="test-conflict", finding_ids=[f1.id, f2.id], nature=ConflictNature.CONTRADICTORY, description="Test conflict", severity_weight=0.8, ) resolution = await synthesizer.synthesize(conflict, [f1, f2]) assert resolution.decision == "prefer_second" assert "fallback" in resolution.reasoning.lower() @pytest.mark.asyncio async def test_synthesize_fallback_equal_severity(self) -> None: mock_llm = MockLLMClient(responses=["not valid json"]) synthesizer = ConflictSynthesizer(mock_llm) f1 = make_finding(AgentName.STYLE, severity=Severity.MEDIUM) f2 = make_finding(AgentName.SECURITY, severity=Severity.MEDIUM) conflict = Conflict( id="test-conflict", finding_ids=[f1.id, f2.id], nature=ConflictNature.CONTRADICTORY, description="Test conflict", severity_weight=0.8, ) resolution = await synthesizer.synthesize(conflict, [f1, f2]) assert resolution.decision == "keep_both" assert "equal severity" in resolution.reasoning.lower() @pytest.mark.asyncio async def test_synthesize_parse_json_in_code_block(self) -> None: mock_response = """Here is my analysis: ```json { "decision": "merge", "reasoning": "Both concerns valid", "merged_suggestion": "Do both things", "confidence": 0.9 } ``` """ mock_llm = MockLLMClient(responses=[mock_response]) synthesizer = ConflictSynthesizer(mock_llm) f1 = make_finding(AgentName.SECURITY) f2 = make_finding(AgentName.COMPLEXITY) conflict = Conflict( id="test-conflict", finding_ids=[f1.id, f2.id], nature=ConflictNature.CONTRADICTORY, description="Test", severity_weight=0.8, ) resolution = await synthesizer.synthesize(conflict, [f1, f2]) assert resolution.decision == "merge" assert resolution.merged_suggestion == "Do both things" @pytest.mark.asyncio async def test_synthesize_parse_plain_json(self) -> None: mock_response = """{ "decision": "prefer_second", "reasoning": "Second is better", "confidence": 0.75 }""" mock_llm = MockLLMClient(responses=[mock_response]) synthesizer = ConflictSynthesizer(mock_llm) f1 = make_finding(AgentName.SECURITY) f2 = make_finding(AgentName.COMPLEXITY) conflict = Conflict( id="test-conflict", finding_ids=[f1.id, f2.id], nature=ConflictNature.CONTRADICTORY, description="Test", severity_weight=0.8, ) resolution = await synthesizer.synthesize(conflict, [f1, f2]) assert resolution.decision == "prefer_second" assert resolution.confidence == 0.75 class TestFindingMergerEdgeCases: def test_merge_different_files(self) -> None: merger = FindingMerger() f1 = make_finding(AgentName.SECURITY, file="a.py", line_start=10) f2 = make_finding(AgentName.SECURITY, file="b.py", line_start=10) result = merger.merge([f1, f2], None) assert len(result.groups) == 2 assert len(result.unique_findings) == 2 def test_finding_group_empty(self) -> None: group = FindingGroup( file="test.py", line_start=10, line_end=20, findings=[], ) assert group.primary_finding is None assert group.agents == []