306 lines
10 KiB
Python
306 lines
10 KiB
Python
"""Tests for review agents."""
|
|
|
|
import pytest
|
|
|
|
from arbiter.agents import ComplexityAgent, ReviewContext, SecurityAgent, StyleAgent
|
|
from arbiter.llm.prompts import PromptRegistry
|
|
from arbiter.models import AgentConfig, AgentName, Policy, Severity
|
|
from tests.conftest import MockLLMClient
|
|
|
|
|
|
class TestSecurityAgent:
|
|
@pytest.mark.asyncio
|
|
async def test_review_returns_result(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=["[]"])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ some code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert result.agent_name == AgentName.SECURITY
|
|
assert result.findings == []
|
|
assert result.duration_ms >= 0
|
|
assert result.tokens_used == 150 # 100 in + 50 out from mock
|
|
assert result.cost_usd == 0.001
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parses_json_findings(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
response = """```json
|
|
[
|
|
{
|
|
"file": "src/auth.py",
|
|
"line_start": 10,
|
|
"line_end": 15,
|
|
"severity": "high",
|
|
"confidence": 0.9,
|
|
"title": "SQL Injection",
|
|
"description": "User input concatenated",
|
|
"reasoning": "Allows SQL injection",
|
|
"suggestion": "Use parameterized queries",
|
|
"references": ["https://owasp.org"]
|
|
}
|
|
]
|
|
```"""
|
|
mock_llm = MockLLMClient(responses=[response])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ query = ...", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert len(result.findings) == 1
|
|
finding = result.findings[0]
|
|
assert finding.file == "src/auth.py"
|
|
assert finding.severity == Severity.HIGH
|
|
assert finding.confidence == 0.9
|
|
assert finding.title == "SQL Injection"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_uses_configured_model(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=["[]"])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
policy = Policy(
|
|
agents={
|
|
AgentName.SECURITY: AgentConfig(model="gpt-4o-mini"),
|
|
AgentName.STYLE: AgentConfig(),
|
|
AgentName.COMPLEXITY: AgentConfig(),
|
|
}
|
|
)
|
|
context = ReviewContext(diff="+ code", policy=policy)
|
|
|
|
await agent.review(context)
|
|
|
|
assert mock_llm.calls[0]["model"] == "gpt-4o-mini"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filters_by_severity(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
response = """[
|
|
{"file": "a.py", "line_start": 1, "line_end": 1, "severity": "high", "confidence": 0.9, "title": "High", "description": "", "reasoning": ""},
|
|
{"file": "b.py", "line_start": 1, "line_end": 1, "severity": "low", "confidence": 0.9, "title": "Low", "description": "", "reasoning": ""},
|
|
{"file": "c.py", "line_start": 1, "line_end": 1, "severity": "info", "confidence": 0.9, "title": "Info", "description": "", "reasoning": ""}
|
|
]"""
|
|
mock_llm = MockLLMClient(responses=[response])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
policy = Policy(
|
|
agents={
|
|
AgentName.SECURITY: AgentConfig(severity_threshold=Severity.MEDIUM),
|
|
AgentName.STYLE: AgentConfig(),
|
|
AgentName.COMPLEXITY: AgentConfig(),
|
|
}
|
|
)
|
|
context = ReviewContext(diff="+ code", policy=policy)
|
|
|
|
result = await agent.review(context)
|
|
|
|
# Only high severity should pass (medium threshold filters low and info)
|
|
assert len(result.findings) == 1
|
|
assert result.findings[0].severity == Severity.HIGH
|
|
|
|
|
|
class TestStyleAgent:
|
|
@pytest.mark.asyncio
|
|
async def test_review_returns_result(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=["[]"])
|
|
agent = StyleAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ some code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert result.agent_name == AgentName.STYLE
|
|
assert result.findings == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_uses_default_model(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=["[]"])
|
|
agent = StyleAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ code", policy=Policy())
|
|
|
|
await agent.review(context)
|
|
|
|
assert mock_llm.calls[0]["model"] == "gpt-4o-mini"
|
|
|
|
|
|
class TestComplexityAgent:
|
|
@pytest.mark.asyncio
|
|
async def test_review_returns_result(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=["[]"])
|
|
agent = ComplexityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ some code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert result.agent_name == AgentName.COMPLEXITY
|
|
assert result.findings == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parses_complexity_findings(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
response = """[
|
|
{
|
|
"file": "processor.py",
|
|
"line_start": 1,
|
|
"line_end": 50,
|
|
"severity": "medium",
|
|
"confidence": 0.8,
|
|
"title": "High cyclomatic complexity",
|
|
"description": "Function has 15 branches",
|
|
"reasoning": "Makes testing and maintenance difficult"
|
|
}
|
|
]"""
|
|
mock_llm = MockLLMClient(responses=[response])
|
|
agent = ComplexityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ complex code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert len(result.findings) == 1
|
|
assert result.findings[0].severity == Severity.MEDIUM
|
|
assert "complexity" in result.findings[0].title.lower()
|
|
|
|
|
|
class TestAgentResponseParsing:
|
|
@pytest.mark.asyncio
|
|
async def test_handles_empty_response(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=[""])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert result.findings == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_invalid_json(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=["not valid json"])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert result.findings == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_json_without_code_block(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
response = '[{"file": "a.py", "line_start": 1, "line_end": 1, "severity": "info", "confidence": 0.5, "title": "Test", "description": "", "reasoning": ""}]'
|
|
mock_llm = MockLLMClient(responses=[response])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert len(result.findings) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_malformed_finding(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
response = """[
|
|
{"file": "a.py", "line_start": 1, "severity": "invalid_severity", "confidence": 0.5, "title": "Bad", "description": "", "reasoning": ""},
|
|
{"file": "b.py", "line_start": 1, "line_end": 1, "severity": "info", "confidence": 0.5, "title": "Valid", "description": "", "reasoning": ""}
|
|
]"""
|
|
mock_llm = MockLLMClient(responses=[response])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
# Only the valid finding should be included (first has invalid severity)
|
|
assert len(result.findings) == 1
|
|
assert result.findings[0].title == "Valid"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_includes_prompt_additions(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=["[]"])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
policy = Policy(
|
|
agents={
|
|
AgentName.SECURITY: AgentConfig(prompt_additions="Focus on authentication"),
|
|
AgentName.STYLE: AgentConfig(),
|
|
AgentName.COMPLEXITY: AgentConfig(),
|
|
}
|
|
)
|
|
context = ReviewContext(diff="+ code", policy=policy)
|
|
|
|
await agent.review(context)
|
|
|
|
message_content = mock_llm.calls[0]["messages"][0]["content"]
|
|
assert "Focus on authentication" in message_content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_non_list_json(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=['{"not": "a list"}'])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert result.findings == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_non_dict_items(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=['["string", 123, null]'])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
context = ReviewContext(diff="+ code", policy=Policy())
|
|
|
|
result = await agent.review(context)
|
|
|
|
assert result.findings == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_agent_without_config_uses_defaults(
|
|
self,
|
|
prompt_registry: PromptRegistry,
|
|
) -> None:
|
|
mock_llm = MockLLMClient(responses=["[]"])
|
|
agent = SecurityAgent(mock_llm, prompt_registry)
|
|
# Create policy with empty agents dict
|
|
policy = Policy(agents={})
|
|
context = ReviewContext(diff="+ code", policy=policy)
|
|
|
|
result = await agent.review(context)
|
|
|
|
# Should use default model (gpt-4o for security)
|
|
assert mock_llm.calls[0]["model"] == "gpt-4o"
|
|
assert result.findings == []
|