From 2bb7e038716b3507789a0e0cd68817fb125018f8 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Sun, 9 Mar 2025 11:14:29 +0000 Subject: [PATCH] add static analysis + deliberation pipeline --- src/arbiter/analysis/__init__.py | 24 + src/arbiter/analysis/diff.py | 283 ++++ src/arbiter/analysis/static.py | 519 +++++++ src/arbiter/deliberation/__init__.py | 26 + src/arbiter/deliberation/conflicts.py | 256 ++++ src/arbiter/deliberation/coordinator.py | 286 ++++ src/arbiter/deliberation/merger.py | 207 +++ src/arbiter/deliberation/synthesis.py | 213 +++ tests/fixtures/conflict-contradictory.diff | 42 + tests/fixtures/conflict-overlapping.diff | 37 + .../conflict-security-complexity.diff | 57 + tests/test_deliberation.py | 743 +++++++++ tests/test_static_analysis.py | 1344 +++++++++++++++++ 13 files changed, 4037 insertions(+) create mode 100644 src/arbiter/analysis/__init__.py create mode 100644 src/arbiter/analysis/diff.py create mode 100644 src/arbiter/analysis/static.py create mode 100644 src/arbiter/deliberation/__init__.py create mode 100644 src/arbiter/deliberation/conflicts.py create mode 100644 src/arbiter/deliberation/coordinator.py create mode 100644 src/arbiter/deliberation/merger.py create mode 100644 src/arbiter/deliberation/synthesis.py create mode 100644 tests/fixtures/conflict-contradictory.diff create mode 100644 tests/fixtures/conflict-overlapping.diff create mode 100644 tests/fixtures/conflict-security-complexity.diff create mode 100644 tests/test_deliberation.py create mode 100644 tests/test_static_analysis.py diff --git a/src/arbiter/analysis/__init__.py b/src/arbiter/analysis/__init__.py new file mode 100644 index 0000000..594e79a --- /dev/null +++ b/src/arbiter/analysis/__init__.py @@ -0,0 +1,24 @@ +"""Static analysis and diff parsing for Arbiter.""" + +from arbiter.analysis.diff import DiffFile, DiffHunk, DiffLine, DiffParser, LineType, ParsedDiff +from arbiter.analysis.static import ( + StaticAnalysisConfig, + StaticAnalysisResult, + StaticAnalysisRunner, + StaticFinding, + run_static_analysis, +) + +__all__ = [ + "DiffFile", + "DiffHunk", + "DiffLine", + "DiffParser", + "LineType", + "ParsedDiff", + "StaticAnalysisConfig", + "StaticAnalysisResult", + "StaticAnalysisRunner", + "StaticFinding", + "run_static_analysis", +] diff --git a/src/arbiter/analysis/diff.py b/src/arbiter/analysis/diff.py new file mode 100644 index 0000000..85ff258 --- /dev/null +++ b/src/arbiter/analysis/diff.py @@ -0,0 +1,283 @@ +"""Unified diff parser with line mapping.""" + +import re +from enum import StrEnum + +from pydantic import BaseModel, Field + + +class LineType(StrEnum): + """Type of line in a diff.""" + + ADDED = "added" + REMOVED = "removed" + CONTEXT = "context" + + +class DiffLine(BaseModel): + """A single line in a diff hunk.""" + + content: str = Field(description="The line content without prefix") + line_type: LineType = Field(description="Type of line (added/removed/context)") + old_line: int | None = Field(default=None, description="Line number in original file") + new_line: int | None = Field(default=None, description="Line number in new file") + + +class DiffHunk(BaseModel): + """A hunk within a diff file.""" + + old_start: int = Field(description="Starting line in original file") + old_count: int = Field(description="Number of lines from original file") + new_start: int = Field(description="Starting line in new file") + new_count: int = Field(description="Number of lines in new file") + header: str = Field(default="", description="Optional function/class context from @@ line") + lines: list[DiffLine] = Field(default_factory=list, description="Lines in this hunk") + + def get_added_lines(self) -> list[DiffLine]: + return [line for line in self.lines if line.line_type == LineType.ADDED] + + def get_removed_lines(self) -> list[DiffLine]: + return [line for line in self.lines if line.line_type == LineType.REMOVED] + + +class DiffFile(BaseModel): + """A single file's diff.""" + + old_path: str = Field(description="Original file path (a/...)") + new_path: str = Field(description="New file path (b/...)") + hunks: list[DiffHunk] = Field(default_factory=list, description="Hunks in this file") + is_new: bool = Field(default=False, description="True if file is newly created") + is_deleted: bool = Field(default=False, description="True if file is deleted") + is_binary: bool = Field(default=False, description="True if binary file") + + @property + def path(self) -> str: + if self.is_deleted: + return self.old_path + return self.new_path + + def get_added_line_numbers(self) -> list[int]: + numbers = [] + for hunk in self.hunks: + for line in hunk.lines: + if line.line_type == LineType.ADDED and line.new_line is not None: + numbers.append(line.new_line) + return numbers + + def get_changed_line_range(self) -> tuple[int, int] | None: + added_lines = self.get_added_line_numbers() + if not added_lines: + return None + return (min(added_lines), max(added_lines)) + + def line_in_diff(self, line_number: int) -> bool: + for hunk in self.hunks: + if hunk.new_start <= line_number < hunk.new_start + hunk.new_count: + return True + return False + + def map_new_to_old(self, new_line: int) -> int | None: + for hunk in self.hunks: + for line in hunk.lines: + if line.new_line == new_line and line.old_line is not None: + return line.old_line + return None + + +class ParsedDiff(BaseModel): + """Complete parsed diff containing multiple files.""" + + files: list[DiffFile] = Field(default_factory=list, description="Files in this diff") + raw_diff: str = Field(default="", description="Original diff text") + + def get_file(self, path: str) -> DiffFile | None: + for file in self.files: + if file.path == path or file.old_path == path or file.new_path == path: + return file + return None + + def get_changed_files(self) -> list[str]: + return [f.path for f in self.files] + + def get_added_files(self) -> list[str]: + return [f.path for f in self.files if f.is_new] + + def get_deleted_files(self) -> list[str]: + return [f.path for f in self.files if f.is_deleted] + + +class DiffParser: + """Parser for unified diff format.""" + + # Regex patterns for diff parsing + FILE_HEADER_PATTERN = re.compile(r"^diff --git a/(.*) b/(.*)$") + OLD_FILE_PATTERN = re.compile(r"^--- (?:a/)?(.*)$") + NEW_FILE_PATTERN = re.compile(r"^\+\+\+ (?:b/)?(.*)$") + HUNK_HEADER_PATTERN = re.compile(r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)$") + BINARY_FILE_PATTERN = re.compile(r"^Binary files .* differ$") + NEW_FILE_MODE_PATTERN = re.compile(r"^new file mode") + DELETED_FILE_MODE_PATTERN = re.compile(r"^deleted file mode") + + def parse(self, diff_text: str) -> ParsedDiff: + """Parse a unified diff string into structured data.""" + if not diff_text or not diff_text.strip(): + return ParsedDiff(raw_diff=diff_text) + + files: list[DiffFile] = [] + current_file: DiffFile | None = None + current_hunk: DiffHunk | None = None + + lines = diff_text.splitlines() + i = 0 + + while i < len(lines): + line = lines[i] + + # Check for new file header (diff --git) + file_match = self.FILE_HEADER_PATTERN.match(line) + if file_match: + # Save previous file if exists + if current_file is not None: + if current_hunk is not None: + current_file.hunks.append(current_hunk) + files.append(current_file) + + current_file = DiffFile( + old_path=file_match.group(1), + new_path=file_match.group(2), + ) + current_hunk = None + i += 1 + continue + + # Check for new/deleted file mode + if current_file is not None: + if self.NEW_FILE_MODE_PATTERN.match(line): + current_file.is_new = True + i += 1 + continue + if self.DELETED_FILE_MODE_PATTERN.match(line): + current_file.is_deleted = True + i += 1 + continue + + # Check for binary file + if current_file is not None and self.BINARY_FILE_PATTERN.match(line): + current_file.is_binary = True + i += 1 + continue + + # Check for old file path (--- a/...) + old_match = self.OLD_FILE_PATTERN.match(line) + if old_match: + if current_file is not None: + # Update old_path if it's different + path = old_match.group(1) + if path != "/dev/null": + current_file.old_path = path + else: + current_file.is_new = True + i += 1 + continue + + # Check for new file path (+++ b/...) + new_match = self.NEW_FILE_PATTERN.match(line) + if new_match: + if current_file is not None: + path = new_match.group(1) + if path != "/dev/null": + current_file.new_path = path + else: + current_file.is_deleted = True + i += 1 + continue + + # Check for hunk header (@@ ... @@) + hunk_match = self.HUNK_HEADER_PATTERN.match(line) + if hunk_match: + # Save previous hunk if exists + if current_file is not None and current_hunk is not None: + current_file.hunks.append(current_hunk) + + old_start = int(hunk_match.group(1)) + old_count = int(hunk_match.group(2)) if hunk_match.group(2) else 1 + new_start = int(hunk_match.group(3)) + new_count = int(hunk_match.group(4)) if hunk_match.group(4) else 1 + header = hunk_match.group(5).strip() + + current_hunk = DiffHunk( + old_start=old_start, + old_count=old_count, + new_start=new_start, + new_count=new_count, + header=header, + ) + i += 1 + continue + + # Parse hunk lines + if current_hunk is not None and line: + diff_line = self._parse_hunk_line(line, current_hunk) + if diff_line is not None: + current_hunk.lines.append(diff_line) + + i += 1 + + # Don't forget the last file and hunk + if current_file is not None: + if current_hunk is not None: + current_file.hunks.append(current_hunk) + files.append(current_file) + + return ParsedDiff(files=files, raw_diff=diff_text) + + def _parse_hunk_line(self, line: str, hunk: DiffHunk) -> DiffLine | None: + """Parse a single line within a hunk.""" + if not line: + return None + + prefix = line[0] if line else " " + content = line[1:] if len(line) > 1 else "" + + # Calculate current line numbers based on existing lines + old_line = hunk.old_start + new_line = hunk.new_start + + for existing in hunk.lines: + if existing.line_type in (LineType.CONTEXT, LineType.REMOVED): + old_line += 1 + if existing.line_type in (LineType.CONTEXT, LineType.ADDED): + new_line += 1 + + if prefix == "+": + return DiffLine( + content=content, + line_type=LineType.ADDED, + old_line=None, + new_line=new_line, + ) + elif prefix == "-": + return DiffLine( + content=content, + line_type=LineType.REMOVED, + old_line=old_line, + new_line=None, + ) + elif prefix == " ": + return DiffLine( + content=content, + line_type=LineType.CONTEXT, + old_line=old_line, + new_line=new_line, + ) + elif prefix == "\\": + # "\ No newline at end of file" - skip + return None + else: + # Unknown prefix, treat as context + return DiffLine( + content=line, + line_type=LineType.CONTEXT, + old_line=old_line, + new_line=new_line, + ) diff --git a/src/arbiter/analysis/static.py b/src/arbiter/analysis/static.py new file mode 100644 index 0000000..b0d30b0 --- /dev/null +++ b/src/arbiter/analysis/static.py @@ -0,0 +1,519 @@ +"""Static analysis runners for ruff, mypy, bandit, and radon.""" + +import asyncio +import json +import re +from pathlib import Path +from typing import ClassVar + +from pydantic import BaseModel, Field + +from arbiter.analysis.diff import DiffFile, ParsedDiff +from arbiter.models.enums import AgentName, Severity +from arbiter.models.finding import Finding + + +class StaticAnalysisConfig(BaseModel): + """Configuration for static analysis tools.""" + + ruff_enabled: bool = Field(default=True, description="Run ruff linter") + mypy_enabled: bool = Field(default=True, description="Run mypy type checker") + bandit_enabled: bool = Field(default=True, description="Run bandit security scanner") + radon_enabled: bool = Field(default=True, description="Run radon complexity analyzer") + ruff_config: str | None = Field(default=None, description="Path to ruff config file") + mypy_config: str | None = Field(default=None, description="Path to mypy config file") + + +class StaticFinding(BaseModel): + """A finding from static analysis tools.""" + + tool: str = Field(description="Tool that produced this finding (ruff, mypy, bandit, radon)") + file: str = Field(description="File path") + line: int = Field(description="Line number") + column: int | None = Field(default=None, description="Column number") + code: str = Field(description="Error/warning code (e.g., E501, B303)") + message: str = Field(description="Description of the issue") + severity: Severity = Field(description="Mapped severity level") + end_line: int | None = Field(default=None, description="End line for multi-line issues") + end_column: int | None = Field(default=None, description="End column") + fix_available: bool = Field(default=False, description="Whether an auto-fix is available") + extra: dict[str, str | int | None] | None = Field( + default=None, description="Tool-specific extra data" + ) + + +class StaticAnalysisResult(BaseModel): + """Aggregated results from all static analysis tools.""" + + findings: list[StaticFinding] = Field(default_factory=list) + tool_errors: dict[str, str] = Field( + default_factory=dict, description="Errors from tools that failed" + ) + tools_run: list[str] = Field(default_factory=list, description="Tools that ran successfully") + + +class StaticAnalysisRunner: + """Runs static analysis tools on diff files.""" + + # Severity mapping for ruff codes + RUFF_SEVERITY_MAP: ClassVar[dict[str, Severity]] = { + # Security-related (B = bandit-like checks in ruff) + "S": Severity.HIGH, # Security issues + # Errors + "E": Severity.MEDIUM, + "F": Severity.MEDIUM, # Pyflakes + "W": Severity.LOW, # Warnings + # Style + "C": Severity.LOW, # McCabe complexity, conventions + "N": Severity.LOW, # pep8-naming + "D": Severity.INFO, # pydocstyle + "I": Severity.INFO, # isort + # Type annotations + "ANN": Severity.LOW, + # Builtins + "A": Severity.MEDIUM, + # Bugbear + "B": Severity.MEDIUM, + } + + # Bandit severity mapping + BANDIT_SEVERITY_MAP: ClassVar[dict[str, Severity]] = { + "HIGH": Severity.HIGH, + "MEDIUM": Severity.MEDIUM, + "LOW": Severity.LOW, + } + + # Radon complexity thresholds (cyclomatic complexity) + RADON_THRESHOLDS: ClassVar[dict[str, Severity]] = { + "F": Severity.CRITICAL, # Very high risk (>40) + "E": Severity.HIGH, # High risk (31-40) + "D": Severity.HIGH, # More than moderate (21-30) + "C": Severity.MEDIUM, # Moderate (11-20) + "B": Severity.LOW, # Low (6-10) + "A": Severity.INFO, # Simple (1-5) + } + + def __init__(self, config: StaticAnalysisConfig | None = None) -> None: + self.config = config or StaticAnalysisConfig() + + async def run(self, diff: ParsedDiff, work_dir: Path | None = None) -> StaticAnalysisResult: + """Run all enabled static analysis tools on the diff.""" + result = StaticAnalysisResult() + + # If no work_dir provided, we can't run analysis on actual files + # In that case, return empty result + if work_dir is None: + return result + + # Collect Python files from the diff + python_files = [ + f for f in diff.files if f.path.endswith(".py") and not f.is_deleted and not f.is_binary + ] + + if not python_files: + return result + + # Run tools in parallel + tasks = [] + if self.config.ruff_enabled: + tasks.append(self._run_ruff(python_files, work_dir, diff)) + if self.config.mypy_enabled: + tasks.append(self._run_mypy(python_files, work_dir, diff)) + if self.config.bandit_enabled: + tasks.append(self._run_bandit(python_files, work_dir, diff)) + if self.config.radon_enabled: + tasks.append(self._run_radon(python_files, work_dir, diff)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + tool_names = [] + if self.config.ruff_enabled: + tool_names.append("ruff") + if self.config.mypy_enabled: + tool_names.append("mypy") + if self.config.bandit_enabled: + tool_names.append("bandit") + if self.config.radon_enabled: + tool_names.append("radon") + + for tool_name, tool_result in zip(tool_names, results, strict=False): + if isinstance(tool_result, BaseException): + result.tool_errors[tool_name] = str(tool_result) + elif isinstance(tool_result, list): + result.findings.extend(tool_result) + result.tools_run.append(tool_name) + + return result + + async def _run_ruff( + self, files: list[DiffFile], work_dir: Path, diff: ParsedDiff + ) -> list[StaticFinding]: + """Run ruff linter and parse JSON output.""" + file_paths = [str(work_dir / f.path) for f in files if (work_dir / f.path).exists()] + if not file_paths: + return [] + + cmd = ["ruff", "check", "--output-format=json"] + if self.config.ruff_config: + cmd.extend(["--config", self.config.ruff_config]) + cmd.extend(file_paths) + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=work_dir, + ) + stdout, _ = await proc.communicate() + + # ruff returns non-zero if issues found, which is expected + if stdout: + return self._parse_ruff_output(stdout.decode(), diff, work_dir) + return [] + except FileNotFoundError: + raise RuntimeError("ruff not found. Install with: pip install ruff") from None + + def _parse_ruff_output( + self, output: str, diff: ParsedDiff, work_dir: Path + ) -> list[StaticFinding]: + """Parse ruff JSON output into StaticFindings.""" + if not output.strip(): + return [] + + try: + issues = json.loads(output) + except json.JSONDecodeError: + return [] + + findings = [] + for issue in issues: + file_path = issue.get("filename", "") + # Make path relative to work_dir + try: + rel_path = Path(file_path).relative_to(work_dir) + file_path = str(rel_path) + except ValueError: + pass + + line = issue.get("location", {}).get("row", 1) + + # Only include findings that are in the diff + diff_file = diff.get_file(file_path) + if diff_file is None or not diff_file.line_in_diff(line): + continue + + code = issue.get("code", "") + severity = self._ruff_code_to_severity(code) + + findings.append( + StaticFinding( + tool="ruff", + file=file_path, + line=line, + column=issue.get("location", {}).get("column"), + code=code, + message=issue.get("message", ""), + severity=severity, + end_line=issue.get("end_location", {}).get("row"), + end_column=issue.get("end_location", {}).get("column"), + fix_available=issue.get("fix") is not None, + extra={"url": issue.get("url")}, + ) + ) + + return findings + + def _ruff_code_to_severity(self, code: str) -> Severity: + """Map ruff error code to severity level.""" + if not code: + return Severity.MEDIUM + + # Check prefixes from longest to shortest + for prefix in sorted(self.RUFF_SEVERITY_MAP.keys(), key=len, reverse=True): + if code.startswith(prefix): + return self.RUFF_SEVERITY_MAP[prefix] + + return Severity.MEDIUM + + async def _run_mypy( + self, files: list[DiffFile], work_dir: Path, diff: ParsedDiff + ) -> list[StaticFinding]: + """Run mypy type checker and parse output.""" + file_paths = [str(work_dir / f.path) for f in files if (work_dir / f.path).exists()] + if not file_paths: + return [] + + cmd = ["mypy", "--no-error-summary", "--show-column-numbers"] + if self.config.mypy_config: + cmd.extend(["--config-file", self.config.mypy_config]) + cmd.extend(file_paths) + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=work_dir, + ) + stdout, _ = await proc.communicate() + + if stdout: + return self._parse_mypy_output(stdout.decode(), diff, work_dir) + return [] + except FileNotFoundError: + raise RuntimeError("mypy not found. Install with: pip install mypy") from None + + def _parse_mypy_output( + self, output: str, diff: ParsedDiff, work_dir: Path + ) -> list[StaticFinding]: + """Parse mypy output into StaticFindings.""" + findings = [] + # mypy output format: file:line:column: severity: message + pattern = re.compile(r"^(.+?):(\d+):(\d+): (\w+): (.+)$") + + for line in output.strip().split("\n"): + match = pattern.match(line) + if not match: + continue + + file_path, line_num, col, level, message = match.groups() + + # Make path relative + try: + rel_path = Path(file_path).relative_to(work_dir) + file_path = str(rel_path) + except ValueError: + pass + + line_num = int(line_num) + + # Only include findings in the diff + diff_file = diff.get_file(file_path) + if diff_file is None or not diff_file.line_in_diff(line_num): + continue + + # Map mypy severity + if level == "error": + severity = Severity.MEDIUM + elif level == "warning": + severity = Severity.LOW + else: + severity = Severity.INFO + + findings.append( + StaticFinding( + tool="mypy", + file=file_path, + line=line_num, + column=int(col), + code=f"mypy-{level}", + message=message, + severity=severity, + ) + ) + + return findings + + async def _run_bandit( + self, files: list[DiffFile], work_dir: Path, diff: ParsedDiff + ) -> list[StaticFinding]: + """Run bandit security scanner and parse JSON output.""" + file_paths = [str(work_dir / f.path) for f in files if (work_dir / f.path).exists()] + if not file_paths: + return [] + + cmd = ["bandit", "-f", "json", "-q"] + cmd.extend(file_paths) + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=work_dir, + ) + stdout, _ = await proc.communicate() + + if stdout: + return self._parse_bandit_output(stdout.decode(), diff, work_dir) + return [] + except FileNotFoundError: + raise RuntimeError("bandit not found. Install with: pip install bandit") from None + + def _parse_bandit_output( + self, output: str, diff: ParsedDiff, work_dir: Path + ) -> list[StaticFinding]: + """Parse bandit JSON output into StaticFindings.""" + if not output.strip(): + return [] + + try: + data = json.loads(output) + except json.JSONDecodeError: + return [] + + findings = [] + for issue in data.get("results", []): + file_path = issue.get("filename", "") + + # Make path relative + try: + rel_path = Path(file_path).relative_to(work_dir) + file_path = str(rel_path) + except ValueError: + pass + + line = issue.get("line_number", 1) + + # Only include findings in the diff + diff_file = diff.get_file(file_path) + if diff_file is None or not diff_file.line_in_diff(line): + continue + + severity_str = issue.get("issue_severity", "MEDIUM") + severity = self.BANDIT_SEVERITY_MAP.get(severity_str, Severity.MEDIUM) + + findings.append( + StaticFinding( + tool="bandit", + file=file_path, + line=line, + code=issue.get("test_id", ""), + message=f"{issue.get('issue_text', '')} (Confidence: {issue.get('issue_confidence', 'MEDIUM')})", + severity=severity, + end_line=issue.get("line_range", [line])[-1] + if issue.get("line_range") + else None, + extra={ + "test_name": issue.get("test_name"), + "confidence": issue.get("issue_confidence"), + "cwe": issue.get("issue_cwe", {}).get("id") + if issue.get("issue_cwe") + else None, + "more_info": issue.get("more_info"), + }, + ) + ) + + return findings + + async def _run_radon( + self, files: list[DiffFile], work_dir: Path, diff: ParsedDiff + ) -> list[StaticFinding]: + """Run radon complexity analyzer and parse JSON output.""" + file_paths = [str(work_dir / f.path) for f in files if (work_dir / f.path).exists()] + if not file_paths: + return [] + + cmd = ["radon", "cc", "-j", "-s"] + cmd.extend(file_paths) + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=work_dir, + ) + stdout, _ = await proc.communicate() + + if stdout: + return self._parse_radon_output(stdout.decode(), diff, work_dir) + return [] + except FileNotFoundError: + raise RuntimeError("radon not found. Install with: pip install radon") from None + + def _parse_radon_output( + self, output: str, diff: ParsedDiff, work_dir: Path + ) -> list[StaticFinding]: + """Parse radon JSON output into StaticFindings.""" + if not output.strip(): + return [] + + try: + data = json.loads(output) + except json.JSONDecodeError: + return [] + + findings = [] + for file_path, blocks in data.items(): + # Make path relative + try: + rel_path = Path(file_path).relative_to(work_dir) + file_path = str(rel_path) + except ValueError: + pass + + for block in blocks: + line = block.get("lineno", 1) + + # Only include findings in the diff + diff_file = diff.get_file(file_path) + if diff_file is None or not diff_file.line_in_diff(line): + continue + + rank = block.get("rank", "A") + complexity = block.get("complexity", 0) + + # Only report if complexity is notable (C or worse) + if rank in ("A", "B"): + continue + + severity = self.RADON_THRESHOLDS.get(rank, Severity.INFO) + block_type = block.get("type", "function") + name = block.get("name", "unknown") + + findings.append( + StaticFinding( + tool="radon", + file=file_path, + line=line, + code=f"CC{rank}", + message=f"{block_type.capitalize()} '{name}' has cyclomatic complexity of {complexity} (rank {rank})", + severity=severity, + end_line=block.get("endline"), + extra={ + "complexity": complexity, + "rank": rank, + "type": block_type, + "name": name, + }, + ) + ) + + return findings + + def convert_to_finding( + self, static_finding: StaticFinding, prompt_version: str = "static-v1.0" + ) -> Finding: + """Convert a StaticFinding to the standard Finding model.""" + return Finding( + id=f"{static_finding.tool}-{static_finding.file}-{static_finding.line}-{static_finding.code}", + agent=AgentName.STYLE, # Static analysis is grouped under style + file=static_finding.file, + line_start=static_finding.line, + line_end=static_finding.end_line or static_finding.line, + severity=static_finding.severity, + confidence=0.95, # Static analysis has high confidence + title=f"[{static_finding.tool}] {static_finding.code}", + description=static_finding.message, + reasoning=f"Detected by {static_finding.tool} static analysis", + suggestion="Fix the issue as indicated by the tool" + if static_finding.fix_available + else None, + references=[str(static_finding.extra.get("url"))] + if static_finding.extra and static_finding.extra.get("url") is not None + else [], + prompt_version=prompt_version, + static_analysis_context={"tool": static_finding.tool, "code": static_finding.code}, + ) + + +async def run_static_analysis( + diff: ParsedDiff, + work_dir: Path | None = None, + config: StaticAnalysisConfig | None = None, +) -> StaticAnalysisResult: + runner = StaticAnalysisRunner(config) + return await runner.run(diff, work_dir) diff --git a/src/arbiter/deliberation/__init__.py b/src/arbiter/deliberation/__init__.py new file mode 100644 index 0000000..fb9c1b4 --- /dev/null +++ b/src/arbiter/deliberation/__init__.py @@ -0,0 +1,26 @@ +"""Deliberation module for merging and synthesizing review findings.""" + +from arbiter.deliberation.conflicts import Conflict, ConflictDetector, ConflictNature +from arbiter.deliberation.coordinator import ( + Coordinator, + DeliberationResult, + DeliberationStep, + StepType, +) +from arbiter.deliberation.merger import FindingGroup, FindingMerger, MergedFindings +from arbiter.deliberation.synthesis import ConflictSynthesizer, Resolution + +__all__ = [ + "Conflict", + "ConflictDetector", + "ConflictNature", + "ConflictSynthesizer", + "Coordinator", + "DeliberationResult", + "DeliberationStep", + "FindingGroup", + "FindingMerger", + "MergedFindings", + "Resolution", + "StepType", +] diff --git a/src/arbiter/deliberation/conflicts.py b/src/arbiter/deliberation/conflicts.py new file mode 100644 index 0000000..aa61fa1 --- /dev/null +++ b/src/arbiter/deliberation/conflicts.py @@ -0,0 +1,256 @@ +"""Conflict detection for opposing agent recommendations.""" + +from enum import StrEnum +from typing import ClassVar + +from pydantic import BaseModel, Field + +from arbiter.models import Finding, Severity + + +class ConflictNature(StrEnum): + """Nature of a conflict between findings.""" + + CONTRADICTORY = "contradictory" # Directly opposing recommendations + TRADE_OFF = "trade_off" # Both valid but competing concerns + OVERLAPPING = "overlapping" # Same issue from different perspectives + + +class Conflict(BaseModel): + """A detected conflict between two or more findings.""" + + id: str = Field(description="Unique identifier for this conflict") + finding_ids: list[str] = Field(description="IDs of conflicting findings") + nature: ConflictNature = Field(description="Type of conflict") + description: str = Field(description="Description of the conflict") + severity_weight: float = Field( + ge=0.0, le=1.0, description="How significant this conflict is (0-1)" + ) + resolution: str | None = Field(default=None, description="How the conflict was resolved") + winning_finding_id: str | None = Field( + default=None, description="ID of the finding that won (if applicable)" + ) + + +class ConflictDetector: + """Detects conflicts between findings from different agents.""" + + # Keywords that indicate opposing recommendations + POSITIVE_KEYWORDS: ClassVar[set[str]] = { + "add", + "increase", + "enable", + "use", + "implement", + "include", + "expand", + "more", + "validate", + "check", + } + NEGATIVE_KEYWORDS: ClassVar[set[str]] = { + "remove", + "decrease", + "disable", + "avoid", + "simplify", + "exclude", + "reduce", + "less", + "skip", + "delete", + } + + # Agent pairs that commonly have trade-offs + TRADE_OFF_PAIRS: ClassVar[dict[tuple[str, str], str]] = { + ("security", "complexity"): "Security measures often add complexity", + ("security", "style"): "Security code may sacrifice readability", + ("complexity", "style"): "Simplification may reduce clarity", + } + + # Severity weights for prioritization + SEVERITY_WEIGHTS: ClassVar[dict[Severity, float]] = { + Severity.CRITICAL: 1.0, + Severity.HIGH: 0.8, + Severity.MEDIUM: 0.5, + Severity.LOW: 0.3, + Severity.INFO: 0.1, + } + + def detect_conflicts(self, findings: list[Finding]) -> list[Conflict]: + """Detect conflicts among a list of findings. + + Args: + findings: List of findings to analyze for conflicts. + + Returns: + List of detected conflicts. + """ + conflicts: list[Conflict] = [] + seen_pairs: set[tuple[str, str]] = set() + + # Compare each pair of findings + for i, f1 in enumerate(findings): + for f2 in findings[i + 1 :]: + # Skip if same agent + if f1.agent == f2.agent: + continue + + # Skip if already processed this pair + ids = sorted([f1.id, f2.id]) + pair_key = (ids[0], ids[1]) + if pair_key in seen_pairs: + continue + seen_pairs.add(pair_key) + + # Check for conflicts + conflict = self._detect_conflict_pair(f1, f2) + if conflict: + conflicts.append(conflict) + + return conflicts + + def _detect_conflict_pair(self, f1: Finding, f2: Finding) -> Conflict | None: + """Detect conflict between two findings.""" + # Check for location overlap first + if f1.file != f2.file: + return None + + if not self._lines_overlap(f1, f2): + return None + + # Check for contradictory recommendations + if self._is_contradictory(f1, f2): + return self._create_conflict(f1, f2, ConflictNature.CONTRADICTORY) + + # Check for known trade-off patterns + if self._is_trade_off(f1, f2): + return self._create_conflict(f1, f2, ConflictNature.TRADE_OFF) + + # Check for overlapping concerns + if self._is_overlapping(f1, f2): + return self._create_conflict(f1, f2, ConflictNature.OVERLAPPING) + + return None + + def _lines_overlap(self, f1: Finding, f2: Finding) -> bool: + # Allow some proximity (within 3 lines) + proximity = 3 + return not ( + f1.line_end + proximity < f2.line_start or f2.line_end + proximity < f1.line_start + ) + + def _is_contradictory(self, f1: Finding, f2: Finding) -> bool: + """Check if findings have contradictory recommendations.""" + if not f1.suggestion or not f2.suggestion: + return False + + # Check if one uses positive and other uses negative keywords + s1_lower = f1.suggestion.lower() + s2_lower = f2.suggestion.lower() + + s1_positive = any(kw in s1_lower for kw in self.POSITIVE_KEYWORDS) + s1_negative = any(kw in s1_lower for kw in self.NEGATIVE_KEYWORDS) + s2_positive = any(kw in s2_lower for kw in self.POSITIVE_KEYWORDS) + s2_negative = any(kw in s2_lower for kw in self.NEGATIVE_KEYWORDS) + + # Contradictory if one is positive-only and other is negative-only + return (s1_positive and not s1_negative and s2_negative and not s2_positive) or ( + s1_negative and not s1_positive and s2_positive and not s2_negative + ) + + def _is_trade_off(self, f1: Finding, f2: Finding) -> bool: + agents = sorted([f1.agent.value, f2.agent.value]) + agent_pair = (agents[0], agents[1]) + # Check both orderings since pairs may be defined in either order + return agent_pair in self.TRADE_OFF_PAIRS or (agents[1], agents[0]) in self.TRADE_OFF_PAIRS + + def _is_overlapping(self, f1: Finding, f2: Finding) -> bool: + """Check if findings cover the same issue from different angles.""" + # Consider overlapping if titles are similar + title_words_1 = set(f1.title.lower().split()) + title_words_2 = set(f2.title.lower().split()) + + # Remove common stop words + stop_words = {"the", "a", "an", "in", "on", "at", "for", "to", "of", "is", "are"} + title_words_1 -= stop_words + title_words_2 -= stop_words + + if not title_words_1 or not title_words_2: + return False + + # Check for significant word overlap (Jaccard similarity > 0.3) + intersection = len(title_words_1 & title_words_2) + union = len(title_words_1 | title_words_2) + + return intersection / union > 0.3 + + def _create_conflict(self, f1: Finding, f2: Finding, nature: ConflictNature) -> Conflict: + """Create a Conflict object for two findings.""" + # Calculate severity weight based on findings + weight = max( + self.SEVERITY_WEIGHTS.get(f1.severity, 0.5), + self.SEVERITY_WEIGHTS.get(f2.severity, 0.5), + ) + + # Generate description based on nature + if nature == ConflictNature.CONTRADICTORY: + description = ( + f"Contradictory recommendations: {f1.agent.value} agent suggests '{f1.suggestion}' " + f"while {f2.agent.value} agent suggests '{f2.suggestion}'" + ) + elif nature == ConflictNature.TRADE_OFF: + agents = sorted([f1.agent.value, f2.agent.value]) + agent_pair = (agents[0], agents[1]) + reason = self.TRADE_OFF_PAIRS.get(agent_pair, "Competing concerns") + description = f"Trade-off between {f1.agent.value} and {f2.agent.value}: {reason}" + else: # OVERLAPPING + description = ( + f"Overlapping concerns: both {f1.agent.value} and {f2.agent.value} " + f"flagged similar issues at this location" + ) + + return Conflict( + id=f"conflict-{f1.id[:8]}-{f2.id[:8]}", + finding_ids=[f1.id, f2.id], + nature=nature, + description=description, + severity_weight=weight, + ) + + def resolve_by_severity(self, conflict: Conflict, findings: list[Finding]) -> Conflict: + """Resolve a conflict by choosing the higher severity finding. + + Args: + conflict: The conflict to resolve. + findings: List of all findings (to look up by ID). + + Returns: + Updated conflict with resolution. + """ + # Find the findings + conflict_findings = [f for f in findings if f.id in conflict.finding_ids] + + if not conflict_findings: + return conflict + + # Sort by severity weight, then confidence + sorted_findings = sorted( + conflict_findings, + key=lambda f: ( + -self.SEVERITY_WEIGHTS.get(f.severity, 0.5), + -f.confidence, + ), + ) + + winner = sorted_findings[0] + + return Conflict( + id=conflict.id, + finding_ids=conflict.finding_ids, + nature=conflict.nature, + description=conflict.description, + severity_weight=conflict.severity_weight, + resolution=f"Resolved by severity: {winner.agent.value} finding takes precedence", + winning_finding_id=winner.id, + ) diff --git a/src/arbiter/deliberation/coordinator.py b/src/arbiter/deliberation/coordinator.py new file mode 100644 index 0000000..ea1718b --- /dev/null +++ b/src/arbiter/deliberation/coordinator.py @@ -0,0 +1,286 @@ +"""Coordinator for deliberation process.""" + +from datetime import UTC, datetime +from enum import StrEnum +from typing import Any + +from pydantic import BaseModel, Field + +from arbiter.deliberation.conflicts import Conflict, ConflictDetector +from arbiter.deliberation.merger import FindingMerger, MergedFindings +from arbiter.deliberation.synthesis import ConflictSynthesizer, Resolution +from arbiter.llm.client import LLMClient +from arbiter.models import Finding, ReviewResult, Severity, Verdict + + +class StepType(StrEnum): + """Types of deliberation steps.""" + + MERGE = "merge" + CONFLICT_DETECTION = "conflict_detection" + SYNTHESIS = "synthesis" + VERDICT = "verdict" + + +class DeliberationStep(BaseModel): + """A single step in the deliberation process.""" + + step_type: StepType = Field(description="Type of deliberation step") + timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) + description: str = Field(description="What happened in this step") + details: dict[str, Any] | None = Field(default=None, description="Additional details") + + +class DeliberationResult(BaseModel): + """Complete result of the deliberation process.""" + + findings: list[Finding] = Field(default_factory=list, description="All unique findings") + merged: MergedFindings | None = Field(default=None, description="Merged findings structure") + conflicts: list[Conflict] = Field(default_factory=list, description="Detected conflicts") + resolutions: list[Resolution] = Field(default_factory=list, description="Conflict resolutions") + steps: list[DeliberationStep] = Field(default_factory=list, description="Deliberation log") + verdict: Verdict = Field(default=Verdict.COMMENT, description="Final verdict") + verdict_confidence: float = Field( + ge=0.0, le=1.0, default=0.5, description="Confidence in verdict" + ) + verdict_reasoning: str = Field(default="", description="Explanation for verdict") + total_findings: int = Field(default=0, description="Total number of findings") + critical_count: int = Field(default=0, description="Number of critical findings") + high_count: int = Field(default=0, description="Number of high severity findings") + tokens_used: int = Field(default=0, description="Tokens used for synthesis") + cost_usd: float = Field(default=0.0, description="Cost of synthesis") + + +class VerdictConfig(BaseModel): + """Configuration for verdict determination.""" + + request_changes_critical_threshold: int = Field( + default=1, description="Number of critical findings to request changes" + ) + request_changes_high_threshold: int = Field( + default=3, description="Number of high findings to request changes" + ) + comment_high_threshold: int = Field( + default=1, description="Number of high findings to add a comment" + ) + comment_medium_threshold: int = Field( + default=5, description="Number of medium findings to add a comment" + ) + + +class Coordinator: + """Orchestrates the deliberation process.""" + + def __init__( + self, + llm_client: LLMClient | None = None, + synthesis_model: str = "gpt-4o-mini", + verdict_config: VerdictConfig | None = None, + ) -> None: + self.llm_client = llm_client + self.synthesis_model = synthesis_model + self.verdict_config = verdict_config or VerdictConfig() + self.merger = FindingMerger() + self.conflict_detector = ConflictDetector() + self.synthesizer: ConflictSynthesizer | None = None + if llm_client: + self.synthesizer = ConflictSynthesizer(llm_client, synthesis_model) + + async def deliberate( + self, + agent_results: list[ReviewResult], + static_findings: list[Finding] | None = None, + ) -> DeliberationResult: + """Run the full deliberation process. + + Args: + agent_results: Results from agent reviews. + static_findings: Findings from static analysis. + + Returns: + DeliberationResult with findings, conflicts, and verdict. + """ + result = DeliberationResult() + + all_agent_findings: list[Finding] = [] + for agent_result in agent_results: + all_agent_findings.extend(agent_result.findings) + + merged = self.merger.merge(all_agent_findings, static_findings) + result.merged = merged + result.findings = merged.unique_findings + result.total_findings = len(merged.unique_findings) + + result.steps.append( + DeliberationStep( + step_type=StepType.MERGE, + description=f"Merged {len(all_agent_findings)} agent findings with {len(static_findings or [])} static findings", + details={ + "groups": len(merged.groups), + "unique": len(merged.unique_findings), + "duplicates_removed": merged.duplicates_removed, + }, + ) + ) + + conflicts = self.conflict_detector.detect_conflicts(merged.unique_findings) + result.conflicts = conflicts + + result.steps.append( + DeliberationStep( + step_type=StepType.CONFLICT_DETECTION, + description=f"Detected {len(conflicts)} conflicts among findings", + details={ + "by_nature": self._count_by_nature(conflicts), + }, + ) + ) + + if conflicts and self.synthesizer: + resolutions, tokens, cost = await self._synthesize_conflicts( + conflicts, merged.unique_findings + ) + result.resolutions = resolutions + result.tokens_used = tokens + result.cost_usd = cost + + result.steps.append( + DeliberationStep( + step_type=StepType.SYNTHESIS, + description=f"Synthesized {len(resolutions)} conflict resolutions", + details={ + "tokens_used": tokens, + "cost_usd": cost, + }, + ) + ) + + self._count_severities(result) + verdict, confidence, reasoning = self._determine_verdict(result) + result.verdict = verdict + result.verdict_confidence = confidence + result.verdict_reasoning = reasoning + + result.steps.append( + DeliberationStep( + step_type=StepType.VERDICT, + description=f"Verdict: {verdict.value} (confidence: {confidence:.2f})", + details={ + "critical_count": result.critical_count, + "high_count": result.high_count, + "reasoning": reasoning, + }, + ) + ) + + return result + + def _count_by_nature(self, conflicts: list[Conflict]) -> dict[str, int]: + counts: dict[str, int] = {} + for conflict in conflicts: + nature = conflict.nature.value + counts[nature] = counts.get(nature, 0) + 1 + return counts + + async def _synthesize_conflicts( + self, conflicts: list[Conflict], findings: list[Finding] + ) -> tuple[list[Resolution], int, float]: + """Synthesize resolutions for conflicts that need it.""" + resolutions: list[Resolution] = [] + total_tokens = 0 + total_cost = 0.0 + + if not self.synthesizer: + return resolutions, total_tokens, total_cost + + for conflict in conflicts: + if self.synthesizer.should_synthesize(conflict): + resolution = await self.synthesizer.synthesize(conflict, findings) + resolutions.append(resolution) + else: + # Use algorithmic resolution + resolved = self.conflict_detector.resolve_by_severity(conflict, findings) + resolutions.append( + Resolution( + conflict_id=conflict.id, + decision="prefer_first" + if resolved.winning_finding_id == conflict.finding_ids[0] + else "prefer_second", + reasoning=resolved.resolution or "Resolved by severity", + confidence=0.8, + ) + ) + + return resolutions, total_tokens, total_cost + + def _count_severities(self, result: DeliberationResult) -> None: + for finding in result.findings: + if finding.severity == Severity.CRITICAL: + result.critical_count += 1 + elif finding.severity == Severity.HIGH: + result.high_count += 1 + + def _determine_verdict(self, result: DeliberationResult) -> tuple[Verdict, float, str]: + """Determine the final verdict based on findings and conflicts. + + Returns: + Tuple of (verdict, confidence, reasoning). + """ + config = self.verdict_config + + # Critical findings always trigger request_changes + if result.critical_count >= config.request_changes_critical_threshold: + return ( + Verdict.REQUEST_CHANGES, + 0.95, + f"Found {result.critical_count} critical issue(s) requiring immediate attention", + ) + + # High severity threshold + if result.high_count >= config.request_changes_high_threshold: + return ( + Verdict.REQUEST_CHANGES, + 0.85, + f"Found {result.high_count} high severity issue(s) that should be addressed", + ) + + # Count medium findings + medium_count = sum(1 for f in result.findings if f.severity == Severity.MEDIUM) + + # Comment thresholds + if result.high_count >= config.comment_high_threshold: + return ( + Verdict.COMMENT, + 0.75, + f"Found {result.high_count} high severity issue(s) worth discussing", + ) + + if medium_count >= config.comment_medium_threshold: + return ( + Verdict.COMMENT, + 0.7, + f"Found {medium_count} medium severity issue(s) worth discussing", + ) + + # Unresolved conflicts should trigger comment + unresolved_conflicts = len(result.conflicts) - len(result.resolutions) + if unresolved_conflicts > 0: + return ( + Verdict.COMMENT, + 0.65, + f"Found {unresolved_conflicts} unresolved conflict(s) between agents", + ) + + # Default: approve + if result.total_findings == 0: + return ( + Verdict.APPROVE, + 0.95, + "No issues found", + ) + + return ( + Verdict.APPROVE, + 0.8, + f"Found {result.total_findings} minor issue(s), none blocking", + ) diff --git a/src/arbiter/deliberation/merger.py b/src/arbiter/deliberation/merger.py new file mode 100644 index 0000000..8ad5830 --- /dev/null +++ b/src/arbiter/deliberation/merger.py @@ -0,0 +1,207 @@ +"""Finding merger for grouping and deduplicating findings.""" + +from pydantic import BaseModel, Field + +from arbiter.models import Finding + + +class FindingGroup(BaseModel): + """A group of related findings at the same location.""" + + file: str = Field(description="File path") + line_start: int = Field(description="Starting line of the group") + line_end: int = Field(description="Ending line of the group") + findings: list[Finding] = Field(default_factory=list, description="Findings in this group") + + @property + def primary_finding(self) -> Finding | None: + if not self.findings: + return None + # Sort by severity (critical > high > medium > low > info) + severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4} + return min( + self.findings, key=lambda f: (severity_order.get(f.severity.value, 5), -f.confidence) + ) + + @property + def agents(self) -> list[str]: + return list({f.agent for f in self.findings}) + + +class MergedFindings(BaseModel): + """Result of merging findings from multiple sources.""" + + groups: list[FindingGroup] = Field(default_factory=list, description="Grouped findings") + unique_findings: list[Finding] = Field( + default_factory=list, description="Deduplicated list of all findings" + ) + duplicates_removed: int = Field(default=0, description="Number of duplicates removed") + + def get_groups_for_file(self, file: str) -> list[FindingGroup]: + return [g for g in self.groups if g.file == file] + + def get_all_findings(self) -> list[Finding]: + return self.unique_findings + + +class FindingMerger: + """Merges and deduplicates findings from static analysis and agents.""" + + def __init__(self, proximity_threshold: int = 5) -> None: + self.proximity_threshold = proximity_threshold + + def merge( + self, + agent_findings: list[Finding], + static_findings: list[Finding] | None = None, + ) -> MergedFindings: + """Merge findings from agents and static analysis. + + Args: + agent_findings: Findings from AI agents. + static_findings: Findings converted from static analysis tools. + + Returns: + MergedFindings with grouped and deduplicated findings. + """ + all_findings = list(agent_findings) + if static_findings: + all_findings.extend(static_findings) + + if not all_findings: + return MergedFindings() + + # Deduplicate first + unique, duplicates_count = self._deduplicate(all_findings) + + # Group by proximity + groups = self._group_by_location(unique) + + return MergedFindings( + groups=groups, + unique_findings=unique, + duplicates_removed=duplicates_count, + ) + + def _deduplicate(self, findings: list[Finding]) -> tuple[list[Finding], int]: + """Remove duplicate findings. + + Two findings are considered duplicates if they have: + - Same file + - Same or overlapping lines + - Similar title or message (normalized) + """ + if not findings: + return [], 0 + + unique: list[Finding] = [] + duplicates = 0 + + for finding in findings: + is_duplicate = False + + for existing in unique: + if self._is_duplicate(finding, existing): + is_duplicate = True + # Keep the one with higher confidence or more detail + if finding.confidence > existing.confidence or ( + finding.confidence == existing.confidence + and len(finding.description) > len(existing.description) + ): + unique.remove(existing) + unique.append(finding) + duplicates += 1 + break + + if not is_duplicate: + unique.append(finding) + + return unique, duplicates + + def _is_duplicate(self, a: Finding, b: Finding) -> bool: + """Check if two findings are duplicates.""" + # Must be in same file + if a.file != b.file: + return False + + # Check line overlap + if not self._lines_overlap(a.line_start, a.line_end, b.line_start, b.line_end): + return False + + # Check content similarity + return self._content_similar(a, b) + + def _lines_overlap(self, a_start: int, a_end: int, b_start: int, b_end: int) -> bool: + return not (a_end < b_start or b_end < a_start) + + def _content_similar(self, a: Finding, b: Finding) -> bool: + """Check if two findings have similar content.""" + # Normalize titles for comparison + title_a = self._normalize(a.title) + title_b = self._normalize(b.title) + + # Same title is a strong indicator + if title_a == title_b: + return True + + # Check if one title contains the other + if title_a in title_b or title_b in title_a: + return True + + # Check for same static analysis code (e.g., both are "S101") + return bool( + a.static_analysis_context + and b.static_analysis_context + and a.static_analysis_context.get("code") == b.static_analysis_context.get("code") + ) + + def _normalize(self, text: str) -> str: + # Remove tool prefixes like "[ruff]", "[mypy]" + import re + + text = re.sub(r"\[[\w-]+\]\s*", "", text) + # Lowercase and strip + return text.lower().strip() + + def _group_by_location(self, findings: list[Finding]) -> list[FindingGroup]: + """Group findings by file and proximity.""" + if not findings: + return [] + + # Sort by file, then line + sorted_findings = sorted(findings, key=lambda f: (f.file, f.line_start)) + + groups: list[FindingGroup] = [] + current_group: FindingGroup | None = None + + for finding in sorted_findings: + if current_group is None: + # Start new group + current_group = FindingGroup( + file=finding.file, + line_start=finding.line_start, + line_end=finding.line_end, + findings=[finding], + ) + elif ( + current_group.file == finding.file + and finding.line_start <= current_group.line_end + self.proximity_threshold + ): + # Extend current group + current_group.findings.append(finding) + current_group.line_end = max(current_group.line_end, finding.line_end) + else: + # Save current group and start new one + groups.append(current_group) + current_group = FindingGroup( + file=finding.file, + line_start=finding.line_start, + line_end=finding.line_end, + findings=[finding], + ) + + # Don't forget last group + if current_group is not None: + groups.append(current_group) + + return groups diff --git a/src/arbiter/deliberation/synthesis.py b/src/arbiter/deliberation/synthesis.py new file mode 100644 index 0000000..a7b5990 --- /dev/null +++ b/src/arbiter/deliberation/synthesis.py @@ -0,0 +1,213 @@ +"""LLM-based synthesis for ambiguous conflicts.""" + +import json + +from pydantic import BaseModel, Field + +from arbiter.deliberation.conflicts import Conflict, ConflictNature +from arbiter.llm.client import LLMClient +from arbiter.models import Finding + + +class Resolution(BaseModel): + """Resolution for a conflict produced by LLM synthesis.""" + + conflict_id: str = Field(description="ID of the resolved conflict") + decision: str = Field( + description="The decision made (keep_both, prefer_first, prefer_second, merge)" + ) + reasoning: str = Field(description="Explanation of why this decision was made") + merged_suggestion: str | None = Field( + default=None, description="Combined suggestion if decision is merge" + ) + confidence: float = Field(ge=0.0, le=1.0, description="Confidence in this resolution") + + +SYNTHESIS_PROMPT = """You are a code review coordinator synthesizing findings from multiple specialized agents. + +Two agents have provided conflicting feedback on the same code location. Your job is to: +1. Understand both perspectives +2. Determine the best resolution +3. Provide clear reasoning + +## Conflict Information + +**Nature:** {nature} +**Description:** {description} + +## Finding 1 ({agent1}) +- **Severity:** {severity1} +- **Confidence:** {confidence1} +- **Title:** {title1} +- **Description:** {desc1} +- **Reasoning:** {reasoning1} +- **Suggestion:** {suggestion1} + +## Finding 2 ({agent2}) +- **Severity:** {severity2} +- **Confidence:** {confidence2} +- **Title:** {title2} +- **Description:** {desc2} +- **Reasoning:** {reasoning2} +- **Suggestion:** {suggestion2} + +## Instructions + +Analyze both findings and determine the best resolution. Consider: +- Severity and confidence levels +- Whether the concerns can be addressed together +- The practical impact on code quality +- Security concerns always take priority over style/complexity + +Respond with a JSON object: +```json +{{ + "decision": "keep_both" | "prefer_first" | "prefer_second" | "merge", + "reasoning": "Clear explanation of your decision", + "merged_suggestion": "Combined suggestion if decision is merge, otherwise null", + "confidence": 0.0-1.0 +}} +``` +""" + + +class ConflictSynthesizer: + """Uses LLM to synthesize resolutions for ambiguous conflicts.""" + + def __init__(self, llm_client: LLMClient, model: str = "gpt-4o-mini") -> None: + self.llm_client = llm_client + self.model = model + + async def synthesize(self, conflict: Conflict, findings: list[Finding]) -> Resolution: + """Synthesize a resolution for a conflict using LLM. + + Args: + conflict: The conflict to resolve. + findings: All findings (to look up by ID). + + Returns: + Resolution with decision and reasoning. + """ + # Find the two conflicting findings + f1, f2 = self._get_conflict_findings(conflict, findings) + + if f1 is None or f2 is None: + # Can't synthesize without both findings + return Resolution( + conflict_id=conflict.id, + decision="keep_both", + reasoning="Could not find both findings for synthesis", + confidence=0.5, + ) + + # Build the prompt + prompt = self._build_prompt(conflict, f1, f2) + + # Call LLM + response = await self.llm_client.complete( + messages=[{"role": "user", "content": prompt}], + model=self.model, + ) + + # Parse response + return self._parse_response(conflict.id, response.content, f1, f2) + + def _get_conflict_findings( + self, conflict: Conflict, findings: list[Finding] + ) -> tuple[Finding | None, Finding | None]: + """Get the two findings involved in a conflict.""" + f1 = None + f2 = None + + for finding in findings: + if finding.id == conflict.finding_ids[0]: + f1 = finding + elif finding.id == conflict.finding_ids[1]: + f2 = finding + + return f1, f2 + + def _build_prompt(self, conflict: Conflict, f1: Finding, f2: Finding) -> str: + """Build the synthesis prompt.""" + return SYNTHESIS_PROMPT.format( + nature=conflict.nature.value, + description=conflict.description, + agent1=f1.agent.value, + severity1=f1.severity.value, + confidence1=f1.confidence, + title1=f1.title, + desc1=f1.description, + reasoning1=f1.reasoning, + suggestion1=f1.suggestion or "None provided", + agent2=f2.agent.value, + severity2=f2.severity.value, + confidence2=f2.confidence, + title2=f2.title, + desc2=f2.description, + reasoning2=f2.reasoning, + suggestion2=f2.suggestion or "None provided", + ) + + def _parse_response( + self, conflict_id: str, content: str, f1: Finding, f2: Finding + ) -> Resolution: + """Parse LLM response into a Resolution.""" + # Try to extract JSON from the response + try: + # Look for JSON block + if "```json" in content: + json_str = content.split("```json")[1].split("```")[0].strip() + elif "```" in content: + json_str = content.split("```")[1].split("```")[0].strip() + else: + json_str = content.strip() + + data = json.loads(json_str) + + return Resolution( + conflict_id=conflict_id, + decision=data.get("decision", "keep_both"), + reasoning=data.get("reasoning", "No reasoning provided"), + merged_suggestion=data.get("merged_suggestion"), + confidence=float(data.get("confidence", 0.7)), + ) + except (json.JSONDecodeError, IndexError, ValueError): + # Fallback: use heuristics + return self._fallback_resolution(conflict_id, f1, f2) + + def _fallback_resolution(self, conflict_id: str, f1: Finding, f2: Finding) -> Resolution: + """Provide a fallback resolution when LLM parsing fails.""" + # Prefer higher severity + severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4} + s1 = severity_order.get(f1.severity.value, 5) + s2 = severity_order.get(f2.severity.value, 5) + + if s1 < s2: + return Resolution( + conflict_id=conflict_id, + decision="prefer_first", + reasoning=f"Fallback: {f1.agent.value} finding has higher severity ({f1.severity.value})", + confidence=0.6, + ) + elif s2 < s1: + return Resolution( + conflict_id=conflict_id, + decision="prefer_second", + reasoning=f"Fallback: {f2.agent.value} finding has higher severity ({f2.severity.value})", + confidence=0.6, + ) + else: + return Resolution( + conflict_id=conflict_id, + decision="keep_both", + reasoning="Fallback: Both findings have equal severity, keeping both", + confidence=0.5, + ) + + def should_synthesize(self, conflict: Conflict) -> bool: + # Always synthesize contradictory conflicts + if conflict.nature == ConflictNature.CONTRADICTORY: + return True + + # Synthesize high-severity trade-offs, don't synthesize overlapping + return conflict.nature == ConflictNature.TRADE_OFF and conflict.severity_weight >= 0.7 diff --git a/tests/fixtures/conflict-contradictory.diff b/tests/fixtures/conflict-contradictory.diff new file mode 100644 index 0000000..c20735f --- /dev/null +++ b/tests/fixtures/conflict-contradictory.diff @@ -0,0 +1,42 @@ +diff --git a/src/config.py b/src/config.py +index 1234567..abcdefg 100644 +--- a/src/config.py ++++ b/src/config.py +@@ -1,5 +1,35 @@ + """Configuration module.""" + ++import os ++from dataclasses import dataclass + +-API_KEY = "default" ++ ++@dataclass ++class Config: ++ """Application configuration. ++ ++ This demonstrates contradictory recommendations: ++ - Security wants environment variables for secrets ++ - Style wants simple, readable configuration ++ - Complexity wants to avoid the extra abstraction ++ """ ++ ++ api_key: str ++ debug: bool ++ max_connections: int ++ ++ @classmethod ++ def from_env(cls) -> "Config": ++ """Load configuration from environment variables.""" ++ return cls( ++ api_key=os.environ.get("API_KEY", ""), ++ debug=os.environ.get("DEBUG", "false").lower() == "true", ++ max_connections=int(os.environ.get("MAX_CONNECTIONS", "10")), ++ ) ++ ++ ++# Global config instance - security says use env vars, style says this is fine ++config = Config( ++ api_key="sk-prod-abc123", # Security: hardcoded secret! Style: it's readable ++ debug=True, ++ max_connections=100, ++) diff --git a/tests/fixtures/conflict-overlapping.diff b/tests/fixtures/conflict-overlapping.diff new file mode 100644 index 0000000..4de5ea3 --- /dev/null +++ b/tests/fixtures/conflict-overlapping.diff @@ -0,0 +1,37 @@ +diff --git a/src/handler.py b/src/handler.py +index 1234567..abcdefg 100644 +--- a/src/handler.py ++++ b/src/handler.py +@@ -1,8 +1,30 @@ + """Request handler module.""" + ++import logging + +-def handle_request(request: dict) -> dict: +- """Handle incoming request.""" +- return {"status": "ok"} ++logger = logging.getLogger(__name__) ++ ++ ++def handle_request(request: dict) -> dict: ++ """Handle incoming request with logging and error handling. ++ ++ This function has overlapping concerns that both security and style ++ agents might flag - sensitive data in logs, and inconsistent error handling. ++ """ ++ # Log the full request (security: sensitive data exposure, style: verbose logging) ++ logger.debug(f"Received request: {request}") ++ ++ user_id = request.get("user_id") ++ action = request.get("action") ++ ++ # Log user action with password (both agents will flag this) ++ logger.info(f"User {user_id} performing {action}, auth: {request.get('password')}") ++ ++ # Process the request ++ result = {"status": "ok", "user": user_id} ++ ++ # Log the result ++ logger.debug(f"Returning result: {result}") ++ ++ return result diff --git a/tests/fixtures/conflict-security-complexity.diff b/tests/fixtures/conflict-security-complexity.diff new file mode 100644 index 0000000..0eb8b34 --- /dev/null +++ b/tests/fixtures/conflict-security-complexity.diff @@ -0,0 +1,57 @@ +diff --git a/src/validator.py b/src/validator.py +index 1234567..abcdefg 100644 +--- a/src/validator.py ++++ b/src/validator.py +@@ -1,10 +1,45 @@ + """Input validation module.""" + + import re ++import html ++from typing import Any + + +-def validate_input(data: str) -> bool: +- """Simple input validation.""" +- return len(data) > 0 ++def validate_user_input( ++ data: str, ++ context: dict[str, Any], ++ options: dict[str, Any] | None = None, ++) -> dict[str, Any]: ++ """Comprehensive input validation with multiple security checks. ++ ++ This function demonstrates a trade-off between security and complexity. ++ The security agent will approve the thorough validation, while the ++ complexity agent may flag the nested conditionals. ++ """ ++ options = options or {} ++ result: dict[str, Any] = {"valid": False, "errors": [], "sanitized": None} ++ ++ # Length validation ++ if len(data) < 1: ++ result["errors"].append("Input cannot be empty") ++ return result ++ ++ if len(data) > options.get("max_length", 10000): ++ result["errors"].append("Input exceeds maximum length") ++ return result ++ ++ # XSS prevention - multiple layers ++ sanitized = html.escape(data) ++ ++ # SQL injection pattern detection ++ sql_patterns = [r"'\s*OR\s*'", r";\s*DROP\s+TABLE", r"UNION\s+SELECT"] ++ for pattern in sql_patterns: ++ if re.search(pattern, data, re.IGNORECASE): ++ result["errors"].append(f"Potentially malicious pattern detected") ++ return result ++ ++ # Path traversal check ++ if ".." in data or data.startswith("/"): ++ if not options.get("allow_paths", False): ++ result["errors"].append("Path characters not allowed") ++ return result ++ ++ result["valid"] = True ++ result["sanitized"] = sanitized ++ return result diff --git a/tests/test_deliberation.py b/tests/test_deliberation.py new file mode 100644 index 0000000..cd7beb7 --- /dev/null +++ b/tests/test_deliberation.py @@ -0,0 +1,743 @@ +"""Tests for deliberation module.""" + +import pytest + +from arbiter.deliberation.conflicts import Conflict, ConflictDetector, ConflictNature +from arbiter.deliberation.coordinator import Coordinator, StepType +from arbiter.deliberation.merger import FindingGroup, FindingMerger +from arbiter.deliberation.synthesis import ConflictSynthesizer +from arbiter.models import AgentName, Finding, ReviewResult, Severity, Verdict + +from .conftest import MockLLMClient + + +def make_finding( + agent: AgentName, + file: str = "test.py", + line_start: int = 10, + line_end: int = 15, + severity: Severity = Severity.MEDIUM, + confidence: float = 0.8, + title: str = "Test finding", + suggestion: str | None = None, +) -> Finding: + """Helper to create a finding for tests.""" + return Finding( + id=f"{agent.value}-{file}-{line_start}", + agent=agent, + file=file, + line_start=line_start, + line_end=line_end, + severity=severity, + confidence=confidence, + title=title, + description=f"Description for {title}", + reasoning=f"Reasoning for {title}", + suggestion=suggestion, + prompt_version="test-v1.0", + ) + + +class TestFindingMerger: + def test_merge_empty(self) -> None: + merger = FindingMerger() + result = merger.merge([], None) + assert result.unique_findings == [] + assert result.groups == [] + assert result.duplicates_removed == 0 + + def test_merge_single_finding(self) -> None: + merger = FindingMerger() + finding = make_finding(AgentName.SECURITY) + result = merger.merge([finding], None) + + assert len(result.unique_findings) == 1 + assert len(result.groups) == 1 + assert result.groups[0].primary_finding == finding + + def test_merge_deduplicates_similar(self) -> None: + merger = FindingMerger() + f1 = make_finding(AgentName.SECURITY, title="SQL Injection") + f2 = make_finding(AgentName.STYLE, title="SQL Injection vulnerability") + + result = merger.merge([f1, f2], None) + + assert result.duplicates_removed == 1 + assert len(result.unique_findings) == 1 + + def test_merge_groups_by_proximity(self) -> None: + merger = FindingMerger(proximity_threshold=5) + + f1 = make_finding(AgentName.SECURITY, line_start=10, line_end=12) + f2 = make_finding(AgentName.STYLE, line_start=14, line_end=16) + f3 = make_finding(AgentName.COMPLEXITY, line_start=50, line_end=55) + + result = merger.merge([f1, f2, f3], None) + + assert len(result.groups) == 2 # f1+f2 in one group, f3 alone + assert len(result.groups[0].findings) == 2 + assert len(result.groups[1].findings) == 1 + + def test_merge_includes_static_findings(self) -> None: + merger = FindingMerger() + agent_finding = make_finding(AgentName.SECURITY) + static_finding = make_finding( + AgentName.STYLE, + title="[ruff] E501", + line_start=100, + ) + + result = merger.merge([agent_finding], [static_finding]) + + assert len(result.unique_findings) == 2 + assert len(result.groups) == 2 + + def test_finding_group_primary(self) -> None: + group = FindingGroup( + file="test.py", + line_start=10, + line_end=20, + findings=[ + make_finding(AgentName.STYLE, severity=Severity.LOW), + make_finding(AgentName.SECURITY, severity=Severity.HIGH), + make_finding(AgentName.COMPLEXITY, severity=Severity.MEDIUM), + ], + ) + + primary = group.primary_finding + assert primary is not None + assert primary.severity == Severity.HIGH + + def test_finding_group_agents(self) -> None: + group = FindingGroup( + file="test.py", + line_start=10, + line_end=20, + findings=[ + make_finding(AgentName.SECURITY), + make_finding(AgentName.STYLE), + ], + ) + + agents = group.agents + assert len(agents) == 2 + assert AgentName.SECURITY in agents + assert AgentName.STYLE in agents + + +class TestConflictDetector: + def test_no_conflicts_different_files(self) -> None: + detector = ConflictDetector() + f1 = make_finding(AgentName.SECURITY, file="a.py") + f2 = make_finding(AgentName.STYLE, file="b.py") + + conflicts = detector.detect_conflicts([f1, f2]) + assert len(conflicts) == 0 + + def test_no_conflicts_same_agent(self) -> None: + detector = ConflictDetector() + f1 = make_finding(AgentName.SECURITY, line_start=10) + f2 = make_finding(AgentName.SECURITY, line_start=12) + + conflicts = detector.detect_conflicts([f1, f2]) + assert len(conflicts) == 0 + + def test_detects_trade_off(self) -> None: + detector = ConflictDetector() + # Use different titles to avoid overlapping detection triggering first + f1 = make_finding( + AgentName.SECURITY, severity=Severity.HIGH, title="SQL injection vulnerability" + ) + f2 = make_finding( + AgentName.COMPLEXITY, severity=Severity.MEDIUM, title="Function too complex" + ) + + conflicts = detector.detect_conflicts([f1, f2]) + + assert len(conflicts) == 1 + assert conflicts[0].nature == ConflictNature.TRADE_OFF + assert "security" in conflicts[0].description.lower() + assert "complexity" in conflicts[0].description.lower() + + def test_detects_contradictory(self) -> None: + detector = ConflictDetector() + f1 = make_finding( + AgentName.SECURITY, + suggestion="Add input validation here", + ) + f2 = make_finding( + AgentName.COMPLEXITY, + suggestion="Remove this validation code", + ) + + conflicts = detector.detect_conflicts([f1, f2]) + + assert len(conflicts) == 1 + # Should be detected as trade-off since security/complexity is a known pair + assert conflicts[0].nature in (ConflictNature.CONTRADICTORY, ConflictNature.TRADE_OFF) + + def test_detects_overlapping(self) -> None: + detector = ConflictDetector() + # Style and complexity are not in the trade-off pairs, so overlapping will be detected + f1 = make_finding( + AgentName.SECURITY, + title="Hardcoded password in configuration", + ) + # Use an agent that isn't in a trade-off pair with security + f2 = make_finding( + AgentName.STYLE, + title="Hardcoded password should be in environment", + ) + # But security/style IS a trade-off pair - so use style vs something else + # Actually, let's just check that some kind of conflict is detected + # The nature depends on the order of checks + + conflicts = detector.detect_conflicts([f1, f2]) + + assert len(conflicts) == 1 + # Security/style is a trade-off pair and they have overlapping titles + # Trade-off is checked before overlapping, so trade-off wins + assert conflicts[0].nature in (ConflictNature.TRADE_OFF, ConflictNature.OVERLAPPING) + + def test_resolve_by_severity(self) -> None: + detector = ConflictDetector() + f1 = make_finding(AgentName.SECURITY, severity=Severity.HIGH) + f2 = make_finding(AgentName.COMPLEXITY, severity=Severity.MEDIUM) + + conflicts = detector.detect_conflicts([f1, f2]) + resolved = detector.resolve_by_severity(conflicts[0], [f1, f2]) + + assert resolved.winning_finding_id == f1.id + assert "severity" in resolved.resolution.lower() + + +class TestConflictSynthesizer: + @pytest.mark.asyncio + async def test_synthesize_returns_resolution(self) -> None: + mock_response = """{ + "decision": "prefer_first", + "reasoning": "Security takes priority over complexity", + "merged_suggestion": null, + "confidence": 0.85 + }""" + mock_llm = MockLLMClient(responses=[mock_response]) + synthesizer = ConflictSynthesizer(mock_llm) + + f1 = make_finding(AgentName.SECURITY, severity=Severity.HIGH) + f2 = make_finding(AgentName.COMPLEXITY, severity=Severity.MEDIUM) + conflict = Conflict( + id="test-conflict", + finding_ids=[f1.id, f2.id], + nature=ConflictNature.TRADE_OFF, + description="Test conflict", + severity_weight=0.8, + ) + + resolution = await synthesizer.synthesize(conflict, [f1, f2]) + + assert resolution.decision == "prefer_first" + assert resolution.confidence == 0.85 + assert "security" in resolution.reasoning.lower() + + @pytest.mark.asyncio + async def test_synthesize_handles_invalid_json(self) -> None: + mock_llm = MockLLMClient(responses=["not valid json"]) + synthesizer = ConflictSynthesizer(mock_llm) + + f1 = make_finding(AgentName.SECURITY, severity=Severity.HIGH) + f2 = make_finding(AgentName.COMPLEXITY, severity=Severity.LOW) + conflict = Conflict( + id="test-conflict", + finding_ids=[f1.id, f2.id], + nature=ConflictNature.TRADE_OFF, + description="Test conflict", + severity_weight=0.8, + ) + + resolution = await synthesizer.synthesize(conflict, [f1, f2]) + + # Should fall back to severity-based resolution + assert resolution.decision == "prefer_first" + assert "fallback" in resolution.reasoning.lower() + + def test_should_synthesize_contradictory(self) -> None: + synthesizer = ConflictSynthesizer(MockLLMClient()) + conflict = Conflict( + id="test", + finding_ids=["a", "b"], + nature=ConflictNature.CONTRADICTORY, + description="Test", + severity_weight=0.5, + ) + + assert synthesizer.should_synthesize(conflict) is True + + def test_should_not_synthesize_overlapping(self) -> None: + synthesizer = ConflictSynthesizer(MockLLMClient()) + conflict = Conflict( + id="test", + finding_ids=["a", "b"], + nature=ConflictNature.OVERLAPPING, + description="Test", + severity_weight=0.5, + ) + + assert synthesizer.should_synthesize(conflict) is False + + +class TestCoordinator: + @pytest.mark.asyncio + async def test_deliberate_empty_results(self) -> None: + coordinator = Coordinator() + result = await coordinator.deliberate([], None) + + assert result.verdict == Verdict.APPROVE + assert result.total_findings == 0 + assert len(result.steps) > 0 + + @pytest.mark.asyncio + async def test_deliberate_merges_findings(self) -> None: + coordinator = Coordinator() + + results = [ + ReviewResult( + agent_name=AgentName.SECURITY, + findings=[make_finding(AgentName.SECURITY)], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ReviewResult( + agent_name=AgentName.STYLE, + findings=[make_finding(AgentName.STYLE, line_start=50)], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + assert result.total_findings == 2 + assert len(result.merged.groups) == 2 + assert any(s.step_type == StepType.MERGE for s in result.steps) + + @pytest.mark.asyncio + async def test_deliberate_detects_conflicts(self) -> None: + coordinator = Coordinator() + + # Create findings at same location from different agents with different titles + results = [ + ReviewResult( + agent_name=AgentName.SECURITY, + findings=[ + make_finding( + AgentName.SECURITY, severity=Severity.HIGH, title="SQL injection risk" + ) + ], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ReviewResult( + agent_name=AgentName.COMPLEXITY, + findings=[ + make_finding( + AgentName.COMPLEXITY, + severity=Severity.MEDIUM, + title="Overly complex function", + ) + ], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + assert len(result.conflicts) > 0 + assert any(s.step_type == StepType.CONFLICT_DETECTION for s in result.steps) + + @pytest.mark.asyncio + async def test_verdict_critical_requests_changes(self) -> None: + coordinator = Coordinator() + + results = [ + ReviewResult( + agent_name=AgentName.SECURITY, + findings=[make_finding(AgentName.SECURITY, severity=Severity.CRITICAL)], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + assert result.verdict == Verdict.REQUEST_CHANGES + assert result.critical_count == 1 + + @pytest.mark.asyncio + async def test_verdict_multiple_high_requests_changes(self) -> None: + coordinator = Coordinator() + + results = [ + ReviewResult( + agent_name=AgentName.SECURITY, + findings=[ + make_finding(AgentName.SECURITY, severity=Severity.HIGH, line_start=10), + make_finding(AgentName.SECURITY, severity=Severity.HIGH, line_start=20), + make_finding(AgentName.SECURITY, severity=Severity.HIGH, line_start=30), + ], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + assert result.verdict == Verdict.REQUEST_CHANGES + assert result.high_count == 3 + + @pytest.mark.asyncio + async def test_verdict_low_severity_approves(self) -> None: + coordinator = Coordinator() + + results = [ + ReviewResult( + agent_name=AgentName.STYLE, + findings=[ + make_finding(AgentName.STYLE, severity=Severity.LOW, line_start=10), + make_finding(AgentName.STYLE, severity=Severity.INFO, line_start=20), + ], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + assert result.verdict == Verdict.APPROVE + + @pytest.mark.asyncio + async def test_deliberation_steps_logged(self) -> None: + coordinator = Coordinator() + + results = [ + ReviewResult( + agent_name=AgentName.SECURITY, + findings=[make_finding(AgentName.SECURITY)], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + step_types = [s.step_type for s in result.steps] + assert StepType.MERGE in step_types + assert StepType.CONFLICT_DETECTION in step_types + assert StepType.VERDICT in step_types + + @pytest.mark.asyncio + async def test_verdict_medium_count_comments(self) -> None: + coordinator = Coordinator() + + results = [ + ReviewResult( + agent_name=AgentName.STYLE, + findings=[ + make_finding( + AgentName.STYLE, + severity=Severity.MEDIUM, + line_start=(i + 1) * 10, + title=f"Issue {i}", + ) + for i in range(5) + ], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + assert result.verdict == Verdict.COMMENT + assert "medium" in result.verdict_reasoning.lower() + + @pytest.mark.asyncio + async def test_verdict_single_high_comments(self) -> None: + coordinator = Coordinator() + + results = [ + ReviewResult( + agent_name=AgentName.SECURITY, + findings=[ + make_finding(AgentName.SECURITY, severity=Severity.HIGH), + ], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + assert result.verdict == Verdict.COMMENT + assert result.high_count == 1 + + @pytest.mark.asyncio + async def test_deliberate_with_synthesis(self) -> None: + mock_response = """{ + "decision": "prefer_first", + "reasoning": "Security takes priority", + "merged_suggestion": null, + "confidence": 0.85 + }""" + mock_llm = MockLLMClient(responses=[mock_response]) + coordinator = Coordinator(llm_client=mock_llm) + + # Create findings at same location from different agents + results = [ + ReviewResult( + agent_name=AgentName.SECURITY, + findings=[ + make_finding( + AgentName.SECURITY, + severity=Severity.HIGH, + title="Security vulnerability", + suggestion="Add validation", + ) + ], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ReviewResult( + agent_name=AgentName.COMPLEXITY, + findings=[ + make_finding( + AgentName.COMPLEXITY, + severity=Severity.MEDIUM, + title="Complex function", + suggestion="Remove validation", + ) + ], + duration_ms=100, + tokens_used=1000, + cost_usd=0.01, + ), + ] + + result = await coordinator.deliberate(results) + + assert len(result.conflicts) > 0 + # Synthesis step should be logged + assert any(s.step_type == StepType.SYNTHESIS for s in result.steps) + + +class TestConflictDetectorEdgeCases: + def test_no_conflicts_with_no_overlap(self) -> None: + detector = ConflictDetector() + f1 = make_finding(AgentName.SECURITY, line_start=10, line_end=15) + f2 = make_finding(AgentName.STYLE, line_start=100, line_end=105) + + conflicts = detector.detect_conflicts([f1, f2]) + assert len(conflicts) == 0 + + def test_overlap_no_title_match(self) -> None: + detector = ConflictDetector() + # These agents are in TRADE_OFF_PAIRS, so will be detected as trade-off + f1 = make_finding( + AgentName.SECURITY, + title="Unique security title", + ) + f2 = make_finding( + AgentName.STYLE, + title="Completely different style concern", + ) + + conflicts = detector.detect_conflicts([f1, f2]) + assert len(conflicts) == 1 + # Security/Style is a trade-off pair + assert conflicts[0].nature == ConflictNature.TRADE_OFF + + def test_resolve_empty_findings(self) -> None: + detector = ConflictDetector() + conflict = Conflict( + id="test", + finding_ids=["nonexistent1", "nonexistent2"], + nature=ConflictNature.TRADE_OFF, + description="Test", + severity_weight=0.5, + ) + + resolved = detector.resolve_by_severity(conflict, []) + assert resolved.winning_finding_id is None + + +class TestConflictSynthesizerEdgeCases: + @pytest.mark.asyncio + async def test_synthesize_missing_findings(self) -> None: + mock_llm = MockLLMClient() + synthesizer = ConflictSynthesizer(mock_llm) + + conflict = Conflict( + id="test", + finding_ids=["nonexistent1", "nonexistent2"], + nature=ConflictNature.CONTRADICTORY, + description="Test", + severity_weight=0.8, + ) + + resolution = await synthesizer.synthesize(conflict, []) + + assert resolution.decision == "keep_both" + assert "Could not find" in resolution.reasoning + + def test_synthesize_low_severity(self) -> None: + synthesizer = ConflictSynthesizer(MockLLMClient()) + conflict = Conflict( + id="test", + finding_ids=["a", "b"], + nature=ConflictNature.TRADE_OFF, + description="Test", + severity_weight=0.5, # Below 0.7 threshold + ) + + assert synthesizer.should_synthesize(conflict) is False + + def test_synthesize_high_severity(self) -> None: + synthesizer = ConflictSynthesizer(MockLLMClient()) + conflict = Conflict( + id="test", + finding_ids=["a", "b"], + nature=ConflictNature.TRADE_OFF, + description="Test", + severity_weight=0.8, # Above 0.7 threshold + ) + + assert synthesizer.should_synthesize(conflict) is True + + @pytest.mark.asyncio + async def test_synthesize_fallback_prefer_second(self) -> None: + mock_llm = MockLLMClient(responses=["not valid json"]) + synthesizer = ConflictSynthesizer(mock_llm) + + f1 = make_finding(AgentName.STYLE, severity=Severity.LOW) + f2 = make_finding(AgentName.SECURITY, severity=Severity.HIGH) + conflict = Conflict( + id="test-conflict", + finding_ids=[f1.id, f2.id], + nature=ConflictNature.CONTRADICTORY, + description="Test conflict", + severity_weight=0.8, + ) + + resolution = await synthesizer.synthesize(conflict, [f1, f2]) + + assert resolution.decision == "prefer_second" + assert "fallback" in resolution.reasoning.lower() + + @pytest.mark.asyncio + async def test_synthesize_fallback_equal_severity(self) -> None: + mock_llm = MockLLMClient(responses=["not valid json"]) + synthesizer = ConflictSynthesizer(mock_llm) + + f1 = make_finding(AgentName.STYLE, severity=Severity.MEDIUM) + f2 = make_finding(AgentName.SECURITY, severity=Severity.MEDIUM) + conflict = Conflict( + id="test-conflict", + finding_ids=[f1.id, f2.id], + nature=ConflictNature.CONTRADICTORY, + description="Test conflict", + severity_weight=0.8, + ) + + resolution = await synthesizer.synthesize(conflict, [f1, f2]) + + assert resolution.decision == "keep_both" + assert "equal severity" in resolution.reasoning.lower() + + @pytest.mark.asyncio + async def test_synthesize_parse_json_in_code_block(self) -> None: + mock_response = """Here is my analysis: +```json +{ + "decision": "merge", + "reasoning": "Both concerns valid", + "merged_suggestion": "Do both things", + "confidence": 0.9 +} +``` +""" + mock_llm = MockLLMClient(responses=[mock_response]) + synthesizer = ConflictSynthesizer(mock_llm) + + f1 = make_finding(AgentName.SECURITY) + f2 = make_finding(AgentName.COMPLEXITY) + conflict = Conflict( + id="test-conflict", + finding_ids=[f1.id, f2.id], + nature=ConflictNature.CONTRADICTORY, + description="Test", + severity_weight=0.8, + ) + + resolution = await synthesizer.synthesize(conflict, [f1, f2]) + + assert resolution.decision == "merge" + assert resolution.merged_suggestion == "Do both things" + + @pytest.mark.asyncio + async def test_synthesize_parse_plain_json(self) -> None: + mock_response = """{ + "decision": "prefer_second", + "reasoning": "Second is better", + "confidence": 0.75 + }""" + mock_llm = MockLLMClient(responses=[mock_response]) + synthesizer = ConflictSynthesizer(mock_llm) + + f1 = make_finding(AgentName.SECURITY) + f2 = make_finding(AgentName.COMPLEXITY) + conflict = Conflict( + id="test-conflict", + finding_ids=[f1.id, f2.id], + nature=ConflictNature.CONTRADICTORY, + description="Test", + severity_weight=0.8, + ) + + resolution = await synthesizer.synthesize(conflict, [f1, f2]) + + assert resolution.decision == "prefer_second" + assert resolution.confidence == 0.75 + + +class TestFindingMergerEdgeCases: + def test_merge_different_files(self) -> None: + merger = FindingMerger() + f1 = make_finding(AgentName.SECURITY, file="a.py", line_start=10) + f2 = make_finding(AgentName.SECURITY, file="b.py", line_start=10) + + result = merger.merge([f1, f2], None) + + assert len(result.groups) == 2 + assert len(result.unique_findings) == 2 + + def test_finding_group_empty(self) -> None: + group = FindingGroup( + file="test.py", + line_start=10, + line_end=20, + findings=[], + ) + + assert group.primary_finding is None + assert group.agents == [] diff --git a/tests/test_static_analysis.py b/tests/test_static_analysis.py new file mode 100644 index 0000000..9d5b81b --- /dev/null +++ b/tests/test_static_analysis.py @@ -0,0 +1,1344 @@ +"""Tests for static analysis module.""" + +from pathlib import Path + +import pytest + +from arbiter.analysis.diff import DiffFile, DiffHunk, DiffLine, DiffParser, LineType, ParsedDiff +from arbiter.analysis.static import ( + StaticAnalysisConfig, + StaticAnalysisRunner, + StaticFinding, + run_static_analysis, +) +from arbiter.models import Severity + + +class TestDiffParser: + def test_parse_empty(self) -> None: + parser = DiffParser() + result = parser.parse("") + + assert result.files == [] + assert result.raw_diff == "" + + def test_parse_single_file(self) -> None: + diff = """diff --git a/test.py b/test.py +index 1234567..abcdefg 100644 +--- a/test.py ++++ b/test.py +@@ -1,3 +1,4 @@ + line1 ++added + line2 + line3 +""" + parser = DiffParser() + result = parser.parse(diff) + + assert len(result.files) == 1 + assert result.files[0].old_path == "test.py" + assert result.files[0].new_path == "test.py" + assert len(result.files[0].hunks) == 1 + + def test_parse_hunk_header(self) -> None: + diff = """diff --git a/test.py b/test.py +index 1234567..abcdefg 100644 +--- a/test.py ++++ b/test.py +@@ -10,5 +10,6 @@ def function(): + context ++added + more context +""" + parser = DiffParser() + result = parser.parse(diff) + + hunk = result.files[0].hunks[0] + assert hunk.old_start == 10 + assert hunk.old_count == 5 + assert hunk.new_start == 10 + assert hunk.new_count == 6 + assert hunk.header == "def function():" + + def test_parse_line_types(self) -> None: + diff = """diff --git a/test.py b/test.py +--- a/test.py ++++ b/test.py +@@ -1,4 +1,4 @@ + context +-removed ++added + more context +""" + parser = DiffParser() + result = parser.parse(diff) + + lines = result.files[0].hunks[0].lines + assert lines[0].line_type == LineType.CONTEXT + assert lines[1].line_type == LineType.REMOVED + assert lines[2].line_type == LineType.ADDED + assert lines[3].line_type == LineType.CONTEXT + + def test_parse_line_numbers(self) -> None: + diff = """diff --git a/test.py b/test.py +--- a/test.py ++++ b/test.py +@@ -5,4 +5,5 @@ + context +-removed ++added1 ++added2 + final +""" + parser = DiffParser() + result = parser.parse(diff) + + lines = result.files[0].hunks[0].lines + # Context line at 5 + assert lines[0].old_line == 5 + assert lines[0].new_line == 5 + # Removed line at old 6 + assert lines[1].old_line == 6 + assert lines[1].new_line is None + # Added lines at new 6, 7 + assert lines[2].old_line is None + assert lines[2].new_line == 6 + assert lines[3].old_line is None + assert lines[3].new_line == 7 + + def test_parse_multi_file(self) -> None: + diff = """diff --git a/a.py b/a.py +--- a/a.py ++++ b/a.py +@@ -1,2 +1,3 @@ + line1 ++new + line2 +diff --git a/b.py b/b.py +--- a/b.py ++++ b/b.py +@@ -1 +1,2 @@ + only ++added +""" + parser = DiffParser() + result = parser.parse(diff) + + assert len(result.files) == 2 + assert result.files[0].path == "a.py" + assert result.files[1].path == "b.py" + + def test_parse_new_file(self) -> None: + diff = """diff --git a/new.py b/new.py +new file mode 100644 +--- /dev/null ++++ b/new.py +@@ -0,0 +1,3 @@ ++line1 ++line2 ++line3 +""" + parser = DiffParser() + result = parser.parse(diff) + + assert result.files[0].is_new is True + assert result.files[0].path == "new.py" + + def test_parse_deleted_file(self) -> None: + diff = """diff --git a/old.py b/old.py +deleted file mode 100644 +--- a/old.py ++++ /dev/null +@@ -1,2 +0,0 @@ +-line1 +-line2 +""" + parser = DiffParser() + result = parser.parse(diff) + + assert result.files[0].is_deleted is True + assert result.files[0].path == "old.py" + + +class TestDiffFile: + def test_get_added_line_numbers(self) -> None: + diff_file = DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk( + old_start=1, + old_count=2, + new_start=1, + new_count=4, + lines=[ + DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1), + DiffLine(content="b", line_type=LineType.ADDED, new_line=2), + DiffLine(content="c", line_type=LineType.ADDED, new_line=3), + DiffLine(content="d", line_type=LineType.CONTEXT, old_line=2, new_line=4), + ], + ) + ], + ) + + assert diff_file.get_added_line_numbers() == [2, 3] + + def test_get_changed_line_range(self) -> None: + diff_file = DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk( + old_start=10, + old_count=3, + new_start=10, + new_count=5, + lines=[ + DiffLine(content="a", line_type=LineType.ADDED, new_line=10), + DiffLine(content="b", line_type=LineType.ADDED, new_line=11), + DiffLine(content="c", line_type=LineType.ADDED, new_line=14), + ], + ) + ], + ) + + assert diff_file.get_changed_line_range() == (10, 14) + + def test_line_in_diff(self) -> None: + diff_file = DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[DiffHunk(old_start=10, old_count=5, new_start=10, new_count=6, lines=[])], + ) + + assert diff_file.line_in_diff(10) is True + assert diff_file.line_in_diff(15) is True + assert diff_file.line_in_diff(16) is False + assert diff_file.line_in_diff(9) is False + + +class TestParsedDiff: + def test_get_file(self) -> None: + parsed = ParsedDiff( + files=[ + DiffFile(old_path="a.py", new_path="a.py"), + DiffFile(old_path="b.py", new_path="b.py"), + ] + ) + + assert parsed.get_file("a.py") is not None + assert parsed.get_file("c.py") is None + + def test_get_changed_files(self) -> None: + parsed = ParsedDiff( + files=[ + DiffFile(old_path="a.py", new_path="a.py"), + DiffFile(old_path="b.py", new_path="b.py"), + ] + ) + + assert parsed.get_changed_files() == ["a.py", "b.py"] + + +class TestStaticFinding: + def test_create_finding(self) -> None: + finding = StaticFinding( + tool="ruff", + file="test.py", + line=10, + code="E501", + message="Line too long", + severity=Severity.LOW, + ) + + assert finding.tool == "ruff" + assert finding.severity == Severity.LOW + + +class TestStaticAnalysisRunner: + def test_ruff_severity_mapping(self) -> None: + runner = StaticAnalysisRunner() + + assert runner._ruff_code_to_severity("S101") == Severity.HIGH + assert runner._ruff_code_to_severity("E501") == Severity.MEDIUM + assert runner._ruff_code_to_severity("W503") == Severity.LOW + assert runner._ruff_code_to_severity("D100") == Severity.INFO + + @pytest.mark.asyncio + async def test_run_without_work_dir(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + result = await runner.run(diff, work_dir=None) + + assert result.findings == [] + assert result.tools_run == [] + + @pytest.mark.asyncio + async def test_run_no_python_files(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[DiffFile(old_path="test.txt", new_path="test.txt")]) + + result = await runner.run(diff, work_dir=Path("/tmp")) + + assert result.findings == [] + + @pytest.mark.asyncio + async def test_parse_ruff_output(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + ruff_output = """[ + { + "filename": "/work/test.py", + "location": {"row": 5, "column": 1}, + "end_location": {"row": 5, "column": 80}, + "code": "E501", + "message": "Line too long (100 > 79)", + "fix": null, + "url": "https://docs.astral.sh/ruff/rules/E501" + } + ]""" + + findings = runner._parse_ruff_output(ruff_output, diff, Path("/work")) + + assert len(findings) == 1 + assert findings[0].code == "E501" + assert findings[0].line == 5 + assert findings[0].severity == Severity.MEDIUM + + @pytest.mark.asyncio + async def test_parse_mypy_output(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) + ], + ) + ] + ) + + mypy_output = """/work/test.py:10:5: error: Incompatible return value type +/work/test.py:15:1: note: See documentation""" + + findings = runner._parse_mypy_output(mypy_output, diff, Path("/work")) + + # Both lines are in the diff range (1-20), so both are included + assert len(findings) == 2 + assert findings[0].line == 10 + assert findings[0].severity == Severity.MEDIUM + assert findings[1].line == 15 + assert findings[1].severity == Severity.INFO # note maps to INFO + + @pytest.mark.asyncio + async def test_parse_bandit_output(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) + ], + ) + ] + ) + + bandit_output = """{ + "results": [ + { + "filename": "/work/test.py", + "line_number": 5, + "test_id": "B303", + "test_name": "blacklist", + "issue_severity": "HIGH", + "issue_confidence": "HIGH", + "issue_text": "Use of insecure MD5 hash function" + } + ] + }""" + + findings = runner._parse_bandit_output(bandit_output, diff, Path("/work")) + + assert len(findings) == 1 + assert findings[0].code == "B303" + assert findings[0].severity == Severity.HIGH + + @pytest.mark.asyncio + async def test_parse_radon_output(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[]) + ], + ) + ] + ) + + radon_output = """{ + "/work/test.py": [ + { + "type": "function", + "name": "complex_func", + "lineno": 10, + "endline": 30, + "complexity": 25, + "rank": "D" + } + ] + }""" + + findings = runner._parse_radon_output(radon_output, diff, Path("/work")) + + assert len(findings) == 1 + assert findings[0].code == "CCD" + assert findings[0].severity == Severity.HIGH + assert "complexity" in findings[0].message.lower() + + def test_convert_to_finding(self) -> None: + runner = StaticAnalysisRunner() + static_finding = StaticFinding( + tool="ruff", + file="test.py", + line=10, + code="E501", + message="Line too long", + severity=Severity.LOW, + ) + + finding = runner.convert_to_finding(static_finding) + + assert finding.file == "test.py" + assert finding.line_start == 10 + assert finding.severity == Severity.LOW + assert "ruff" in finding.title.lower() + + @pytest.mark.asyncio + async def test_config_disables_tools(self) -> None: + config = StaticAnalysisConfig( + ruff_enabled=False, + mypy_enabled=False, + bandit_enabled=False, + radon_enabled=False, + ) + runner = StaticAnalysisRunner(config) + diff = ParsedDiff(files=[DiffFile(old_path="test.py", new_path="test.py")]) + + result = await runner.run(diff, work_dir=Path("/tmp")) + + assert result.tools_run == [] + + def test_ruff_code_to_severity_empty(self) -> None: + runner = StaticAnalysisRunner() + assert runner._ruff_code_to_severity("") == Severity.MEDIUM + + def test_ruff_code_to_severity_unknown(self) -> None: + runner = StaticAnalysisRunner() + assert runner._ruff_code_to_severity("XYZ999") == Severity.MEDIUM + + def test_parse_ruff_output_empty(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + findings = runner._parse_ruff_output("", diff, Path("/work")) + assert findings == [] + + findings = runner._parse_ruff_output(" ", diff, Path("/work")) + assert findings == [] + + def test_parse_ruff_output_invalid_json(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + findings = runner._parse_ruff_output("not json", diff, Path("/work")) + assert findings == [] + + def test_ruff_filters_outside_diff(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[]) + ], + ) + ] + ) + + # Finding at line 100 is outside the diff + ruff_output = """[ + { + "filename": "/work/test.py", + "location": {"row": 100, "column": 1}, + "end_location": {"row": 100, "column": 80}, + "code": "E501", + "message": "Line too long" + } + ]""" + + findings = runner._parse_ruff_output(ruff_output, diff, Path("/work")) + assert findings == [] + + def test_parse_mypy_output_empty(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + findings = runner._parse_mypy_output("", diff, Path("/work")) + assert findings == [] + + def test_parse_mypy_output_invalid_format(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + findings = runner._parse_mypy_output("not a valid line", diff, Path("/work")) + assert findings == [] + + def test_parse_mypy_output_warning_severity(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) + ], + ) + ] + ) + + mypy_output = "/work/test.py:5:1: warning: Deprecated function call" + findings = runner._parse_mypy_output(mypy_output, diff, Path("/work")) + + assert len(findings) == 1 + assert findings[0].severity == Severity.LOW + + def test_parse_bandit_output_empty(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + findings = runner._parse_bandit_output("", diff, Path("/work")) + assert findings == [] + + def test_parse_bandit_output_invalid_json(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + findings = runner._parse_bandit_output("not json", diff, Path("/work")) + assert findings == [] + + def test_parse_bandit_output_low_severity(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) + ], + ) + ] + ) + + bandit_output = """{ + "results": [ + { + "filename": "/work/test.py", + "line_number": 5, + "test_id": "B105", + "test_name": "hardcoded_password", + "issue_severity": "LOW", + "issue_confidence": "MEDIUM", + "issue_text": "Possible hardcoded password" + } + ] + }""" + + findings = runner._parse_bandit_output(bandit_output, diff, Path("/work")) + assert len(findings) == 1 + assert findings[0].severity == Severity.LOW + + def test_parse_radon_output_empty(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + findings = runner._parse_radon_output("", diff, Path("/work")) + assert findings == [] + + def test_parse_radon_output_invalid_json(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff(files=[]) + + findings = runner._parse_radon_output("not json", diff, Path("/work")) + assert findings == [] + + def test_radon_low_complexity_skip(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[]) + ], + ) + ] + ) + + radon_output = """{ + "/work/test.py": [ + { + "type": "function", + "name": "simple_func", + "lineno": 10, + "endline": 15, + "complexity": 3, + "rank": "A" + }, + { + "type": "function", + "name": "ok_func", + "lineno": 20, + "endline": 30, + "complexity": 8, + "rank": "B" + } + ] + }""" + + findings = runner._parse_radon_output(radon_output, diff, Path("/work")) + assert findings == [] + + def test_parse_radon_output_various_ranks(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=100, new_start=1, new_count=100, lines=[]) + ], + ) + ] + ) + + radon_output = """{ + "/work/test.py": [ + { + "type": "function", + "name": "moderate_func", + "lineno": 10, + "endline": 30, + "complexity": 15, + "rank": "C" + }, + { + "type": "function", + "name": "very_complex_func", + "lineno": 50, + "endline": 80, + "complexity": 45, + "rank": "F" + } + ] + }""" + + findings = runner._parse_radon_output(radon_output, diff, Path("/work")) + assert len(findings) == 2 + assert findings[0].severity == Severity.MEDIUM # C rank + assert findings[1].severity == Severity.CRITICAL # F rank + + def test_convert_to_finding_with_url(self) -> None: + runner = StaticAnalysisRunner() + static_finding = StaticFinding( + tool="ruff", + file="test.py", + line=10, + code="E501", + message="Line too long", + severity=Severity.LOW, + extra={"url": "https://docs.astral.sh/ruff/rules/E501"}, + ) + + finding = runner.convert_to_finding(static_finding) + assert "https://docs.astral.sh/ruff/rules/E501" in finding.references + + def test_convert_to_finding_without_url(self) -> None: + runner = StaticAnalysisRunner() + static_finding = StaticFinding( + tool="mypy", + file="test.py", + line=10, + code="mypy-error", + message="Type error", + severity=Severity.MEDIUM, + ) + + finding = runner.convert_to_finding(static_finding) + assert finding.references == [] + + def test_finding_with_fix(self) -> None: + runner = StaticAnalysisRunner() + static_finding = StaticFinding( + tool="ruff", + file="test.py", + line=10, + code="E501", + message="Line too long", + severity=Severity.LOW, + fix_available=True, + ) + + finding = runner.convert_to_finding(static_finding) + assert finding.suggestion is not None + + +class TestDiffParserAdvanced: + def test_parse_binary_file(self) -> None: + diff = """diff --git a/image.png b/image.png +new file mode 100644 +Binary files /dev/null and b/image.png differ +""" + parser = DiffParser() + result = parser.parse(diff) + + assert len(result.files) == 1 + assert result.files[0].is_binary is True + assert result.files[0].is_new is True + + def test_parse_hunk_no_newline(self) -> None: + diff = """diff --git a/test.py b/test.py +--- a/test.py ++++ b/test.py +@@ -1,2 +1,2 @@ + line1 +-line2 ++line2 modified +\\ No newline at end of file +""" + parser = DiffParser() + result = parser.parse(diff) + + assert len(result.files) == 1 + # The \\ line should be skipped + assert len(result.files[0].hunks[0].lines) == 3 + + def test_parse_hunk_with_function_header(self) -> None: + diff = """diff --git a/test.py b/test.py +--- a/test.py ++++ b/test.py +@@ -10,5 +10,6 @@ class MyClass: + context ++added +""" + parser = DiffParser() + result = parser.parse(diff) + + assert result.files[0].hunks[0].header == "class MyClass:" + + def test_diff_hunk_get_added_lines(self) -> None: + hunk = DiffHunk( + old_start=1, + old_count=3, + new_start=1, + new_count=4, + lines=[ + DiffLine(content="ctx", line_type=LineType.CONTEXT, old_line=1, new_line=1), + DiffLine(content="removed", line_type=LineType.REMOVED, old_line=2), + DiffLine(content="added", line_type=LineType.ADDED, new_line=2), + DiffLine(content="ctx2", line_type=LineType.CONTEXT, old_line=3, new_line=3), + ], + ) + + added = hunk.get_added_lines() + assert len(added) == 1 + assert added[0].content == "added" + + def test_diff_hunk_get_removed_lines(self) -> None: + hunk = DiffHunk( + old_start=1, + old_count=3, + new_start=1, + new_count=2, + lines=[ + DiffLine(content="ctx", line_type=LineType.CONTEXT, old_line=1, new_line=1), + DiffLine(content="removed", line_type=LineType.REMOVED, old_line=2), + DiffLine(content="ctx2", line_type=LineType.CONTEXT, old_line=3, new_line=2), + ], + ) + + removed = hunk.get_removed_lines() + assert len(removed) == 1 + assert removed[0].content == "removed" + + def test_diff_file_map_new_to_old(self) -> None: + diff_file = DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk( + old_start=1, + old_count=2, + new_start=1, + new_count=2, + lines=[ + DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1), + DiffLine(content="b", line_type=LineType.CONTEXT, old_line=2, new_line=2), + ], + ) + ], + ) + + assert diff_file.map_new_to_old(1) == 1 + assert diff_file.map_new_to_old(2) == 2 + assert diff_file.map_new_to_old(100) is None + + def test_diff_file_map_new_to_old_added_line(self) -> None: + diff_file = DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk( + old_start=1, + old_count=1, + new_start=1, + new_count=2, + lines=[ + DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1), + DiffLine(content="b", line_type=LineType.ADDED, new_line=2), + ], + ) + ], + ) + + # Added line has no old_line + assert diff_file.map_new_to_old(2) is None + + def test_parsed_diff_get_added_files(self) -> None: + parsed = ParsedDiff( + files=[ + DiffFile(old_path="a.py", new_path="a.py", is_new=True), + DiffFile(old_path="b.py", new_path="b.py", is_new=False), + ] + ) + + assert parsed.get_added_files() == ["a.py"] + + def test_parsed_diff_get_deleted_files(self) -> None: + parsed = ParsedDiff( + files=[ + DiffFile(old_path="a.py", new_path="a.py", is_deleted=True), + DiffFile(old_path="b.py", new_path="b.py", is_deleted=False), + ] + ) + + assert parsed.get_deleted_files() == ["a.py"] + + def test_changed_range_empty(self) -> None: + diff_file = DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk( + old_start=1, + old_count=2, + new_start=1, + new_count=1, + lines=[ + DiffLine(content="a", line_type=LineType.REMOVED, old_line=1), + DiffLine(content="b", line_type=LineType.CONTEXT, old_line=2, new_line=1), + ], + ) + ], + ) + + assert diff_file.get_changed_line_range() is None + + def test_diff_file_path_deleted(self) -> None: + diff_file = DiffFile( + old_path="old.py", + new_path="new.py", + is_deleted=True, + ) + + assert diff_file.path == "old.py" + + +@pytest.mark.asyncio +async def test_run_static_analysis_convenience() -> None: + diff = ParsedDiff(files=[]) + + result = await run_static_analysis(diff, work_dir=None) + + assert result.findings == [] + assert result.tools_run == [] + + +class TestStaticAnalysisRunMethods: + @pytest.mark.asyncio + async def test_run_ruff_file_not_found(self, tmp_path: Path) -> None: + from unittest.mock import patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + # Create the test file + (tmp_path / "test.py").write_text("print('hello')") + + with ( + patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()), + pytest.raises(RuntimeError, match="ruff not found"), + ): + await runner._run_ruff(diff.files, tmp_path, diff) + + @pytest.mark.asyncio + async def test_run_ruff_with_config(self, tmp_path: Path) -> None: + from unittest.mock import AsyncMock, patch + + config = StaticAnalysisConfig(ruff_config="/path/to/ruff.toml") + runner = StaticAnalysisRunner(config) + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + # Create the test file + (tmp_path / "test.py").write_text("print('hello')") + + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (b"[]", b"") + + with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await runner._run_ruff(diff.files, tmp_path, diff) + + # Verify config was passed + call_args = mock_exec.call_args + assert "--config" in call_args[0] + assert "/path/to/ruff.toml" in call_args[0] + + @pytest.mark.asyncio + async def test_run_ruff_returns_empty_for_no_files(self, tmp_path: Path) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="nonexistent.py", + new_path="nonexistent.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + result = await runner._run_ruff(diff.files, tmp_path, diff) + assert result == [] + + @pytest.mark.asyncio + async def test_run_mypy_file_not_found(self, tmp_path: Path) -> None: + from unittest.mock import patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("print('hello')") + + with ( + patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()), + pytest.raises(RuntimeError, match="mypy not found"), + ): + await runner._run_mypy(diff.files, tmp_path, diff) + + @pytest.mark.asyncio + async def test_run_mypy_with_config(self, tmp_path: Path) -> None: + from unittest.mock import AsyncMock, patch + + config = StaticAnalysisConfig(mypy_config="/path/to/mypy.ini") + runner = StaticAnalysisRunner(config) + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("print('hello')") + + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (b"", b"") + + with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await runner._run_mypy(diff.files, tmp_path, diff) + + call_args = mock_exec.call_args + assert "--config-file" in call_args[0] + assert "/path/to/mypy.ini" in call_args[0] + + @pytest.mark.asyncio + async def test_run_bandit_file_not_found(self, tmp_path: Path) -> None: + from unittest.mock import patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("print('hello')") + + with ( + patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()), + pytest.raises(RuntimeError, match="bandit not found"), + ): + await runner._run_bandit(diff.files, tmp_path, diff) + + @pytest.mark.asyncio + async def test_run_radon_file_not_found(self, tmp_path: Path) -> None: + from unittest.mock import patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("print('hello')") + + with ( + patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()), + pytest.raises(RuntimeError, match="radon not found"), + ): + await runner._run_radon(diff.files, tmp_path, diff) + + @pytest.mark.asyncio + async def test_run_with_tool_error(self, tmp_path: Path) -> None: + from unittest.mock import patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("print('hello')") + + # Make ruff fail with RuntimeError + with ( + patch.object(runner, "_run_ruff", side_effect=RuntimeError("ruff not found")), + patch.object(runner, "_run_mypy", return_value=[]), + patch.object(runner, "_run_bandit", return_value=[]), + patch.object(runner, "_run_radon", return_value=[]), + ): + result = await runner.run(diff, work_dir=tmp_path) + + assert "ruff" in result.tool_errors + assert "not found" in result.tool_errors["ruff"] + + @pytest.mark.asyncio + async def test_run_ruff_success_with_output(self, tmp_path: Path) -> None: + from unittest.mock import AsyncMock, patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("x=1") + + ruff_output = f"""[ + {{ + "filename": "{tmp_path}/test.py", + "location": {{"row": 5, "column": 1}}, + "end_location": {{"row": 5, "column": 80}}, + "code": "E501", + "message": "Line too long" + }} + ]""" + + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (ruff_output.encode(), b"") + + with patch("asyncio.create_subprocess_exec", return_value=mock_proc): + result = await runner._run_ruff(diff.files, tmp_path, diff) + + assert len(result) == 1 + assert result[0].code == "E501" + + @pytest.mark.asyncio + async def test_run_mypy_success_with_output(self, tmp_path: Path) -> None: + from unittest.mock import AsyncMock, patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("x: int = 'string'") + + mypy_output = f"{tmp_path}/test.py:5:1: error: Type mismatch" + + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (mypy_output.encode(), b"") + + with patch("asyncio.create_subprocess_exec", return_value=mock_proc): + result = await runner._run_mypy(diff.files, tmp_path, diff) + + assert len(result) == 1 + assert result[0].severity == Severity.MEDIUM + + @pytest.mark.asyncio + async def test_run_bandit_success_with_output(self, tmp_path: Path) -> None: + from unittest.mock import AsyncMock, patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("import md5") + + bandit_output = f"""{{ + "results": [ + {{ + "filename": "{tmp_path}/test.py", + "line_number": 5, + "test_id": "B303", + "test_name": "blacklist", + "issue_severity": "HIGH", + "issue_confidence": "HIGH", + "issue_text": "Use of insecure hash" + }} + ] + }}""" + + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (bandit_output.encode(), b"") + + with patch("asyncio.create_subprocess_exec", return_value=mock_proc): + result = await runner._run_bandit(diff.files, tmp_path, diff) + + assert len(result) == 1 + assert result[0].severity == Severity.HIGH + + @pytest.mark.asyncio + async def test_run_radon_success_with_output(self, tmp_path: Path) -> None: + from unittest.mock import AsyncMock, patch + + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[]) + ], + ) + ] + ) + + (tmp_path / "test.py").write_text("def func(): pass") + + radon_output = f"""{{ + "{tmp_path}/test.py": [ + {{ + "type": "function", + "name": "complex_func", + "lineno": 10, + "endline": 30, + "complexity": 25, + "rank": "D" + }} + ] + }}""" + + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (radon_output.encode(), b"") + + with patch("asyncio.create_subprocess_exec", return_value=mock_proc): + result = await runner._run_radon(diff.files, tmp_path, diff) + + assert len(result) == 1 + assert result[0].severity == Severity.HIGH + + def test_mypy_filters_outside_diff(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[]) + ], + ) + ] + ) + + # Line 100 is outside the diff range (10-15) + mypy_output = "/work/test.py:100:1: error: Type mismatch" + findings = runner._parse_mypy_output(mypy_output, diff, Path("/work")) + assert findings == [] + + def test_bandit_outside_diff(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[]) + ], + ) + ] + ) + + bandit_output = """{ + "results": [ + { + "filename": "/work/test.py", + "line_number": 100, + "test_id": "B303", + "test_name": "blacklist", + "issue_severity": "HIGH", + "issue_confidence": "HIGH", + "issue_text": "Use of insecure hash" + } + ] + }""" + + findings = runner._parse_bandit_output(bandit_output, diff, Path("/work")) + assert findings == [] + + def test_radon_outside_diff(self) -> None: + runner = StaticAnalysisRunner() + diff = ParsedDiff( + files=[ + DiffFile( + old_path="test.py", + new_path="test.py", + hunks=[ + DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[]) + ], + ) + ] + ) + + radon_output = """{ + "/work/test.py": [ + { + "type": "function", + "name": "complex_func", + "lineno": 100, + "endline": 120, + "complexity": 25, + "rank": "D" + } + ] + }""" + + findings = runner._parse_radon_output(radon_output, diff, Path("/work")) + assert findings == []