"""Tests for static analysis module.""" from pathlib import Path import pytest from arbiter.analysis.diff import DiffFile, DiffHunk, DiffLine, DiffParser, LineType, ParsedDiff from arbiter.analysis.static import ( StaticAnalysisConfig, StaticAnalysisRunner, StaticFinding, run_static_analysis, ) from arbiter.models import Severity class TestDiffParser: def test_parse_empty(self) -> None: parser = DiffParser() result = parser.parse("") assert result.files == [] assert result.raw_diff == "" def test_parse_single_file(self) -> None: diff = """diff --git a/test.py b/test.py index 1234567..abcdefg 100644 --- a/test.py +++ b/test.py @@ -1,3 +1,4 @@ line1 +added line2 line3 """ parser = DiffParser() result = parser.parse(diff) assert len(result.files) == 1 assert result.files[0].old_path == "test.py" assert result.files[0].new_path == "test.py" assert len(result.files[0].hunks) == 1 def test_parse_hunk_header(self) -> None: diff = """diff --git a/test.py b/test.py index 1234567..abcdefg 100644 --- a/test.py +++ b/test.py @@ -10,5 +10,6 @@ def function(): context +added more context """ parser = DiffParser() result = parser.parse(diff) hunk = result.files[0].hunks[0] assert hunk.old_start == 10 assert hunk.old_count == 5 assert hunk.new_start == 10 assert hunk.new_count == 6 assert hunk.header == "def function():" def test_parse_line_types(self) -> None: diff = """diff --git a/test.py b/test.py --- a/test.py +++ b/test.py @@ -1,4 +1,4 @@ context -removed +added more context """ parser = DiffParser() result = parser.parse(diff) lines = result.files[0].hunks[0].lines assert lines[0].line_type == LineType.CONTEXT assert lines[1].line_type == LineType.REMOVED assert lines[2].line_type == LineType.ADDED assert lines[3].line_type == LineType.CONTEXT def test_parse_line_numbers(self) -> None: diff = """diff --git a/test.py b/test.py --- a/test.py +++ b/test.py @@ -5,4 +5,5 @@ context -removed +added1 +added2 final """ parser = DiffParser() result = parser.parse(diff) lines = result.files[0].hunks[0].lines # Context line at 5 assert lines[0].old_line == 5 assert lines[0].new_line == 5 # Removed line at old 6 assert lines[1].old_line == 6 assert lines[1].new_line is None # Added lines at new 6, 7 assert lines[2].old_line is None assert lines[2].new_line == 6 assert lines[3].old_line is None assert lines[3].new_line == 7 def test_parse_multi_file(self) -> None: diff = """diff --git a/a.py b/a.py --- a/a.py +++ b/a.py @@ -1,2 +1,3 @@ line1 +new line2 diff --git a/b.py b/b.py --- a/b.py +++ b/b.py @@ -1 +1,2 @@ only +added """ parser = DiffParser() result = parser.parse(diff) assert len(result.files) == 2 assert result.files[0].path == "a.py" assert result.files[1].path == "b.py" def test_parse_new_file(self) -> None: diff = """diff --git a/new.py b/new.py new file mode 100644 --- /dev/null +++ b/new.py @@ -0,0 +1,3 @@ +line1 +line2 +line3 """ parser = DiffParser() result = parser.parse(diff) assert result.files[0].is_new is True assert result.files[0].path == "new.py" def test_parse_deleted_file(self) -> None: diff = """diff --git a/old.py b/old.py deleted file mode 100644 --- a/old.py +++ /dev/null @@ -1,2 +0,0 @@ -line1 -line2 """ parser = DiffParser() result = parser.parse(diff) assert result.files[0].is_deleted is True assert result.files[0].path == "old.py" class TestDiffFile: def test_get_added_line_numbers(self) -> None: diff_file = DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk( old_start=1, old_count=2, new_start=1, new_count=4, lines=[ DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1), DiffLine(content="b", line_type=LineType.ADDED, new_line=2), DiffLine(content="c", line_type=LineType.ADDED, new_line=3), DiffLine(content="d", line_type=LineType.CONTEXT, old_line=2, new_line=4), ], ) ], ) assert diff_file.get_added_line_numbers() == [2, 3] def test_get_changed_line_range(self) -> None: diff_file = DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk( old_start=10, old_count=3, new_start=10, new_count=5, lines=[ DiffLine(content="a", line_type=LineType.ADDED, new_line=10), DiffLine(content="b", line_type=LineType.ADDED, new_line=11), DiffLine(content="c", line_type=LineType.ADDED, new_line=14), ], ) ], ) assert diff_file.get_changed_line_range() == (10, 14) def test_line_in_diff(self) -> None: diff_file = DiffFile( old_path="test.py", new_path="test.py", hunks=[DiffHunk(old_start=10, old_count=5, new_start=10, new_count=6, lines=[])], ) assert diff_file.line_in_diff(10) is True assert diff_file.line_in_diff(15) is True assert diff_file.line_in_diff(16) is False assert diff_file.line_in_diff(9) is False class TestParsedDiff: def test_get_file(self) -> None: parsed = ParsedDiff( files=[ DiffFile(old_path="a.py", new_path="a.py"), DiffFile(old_path="b.py", new_path="b.py"), ] ) assert parsed.get_file("a.py") is not None assert parsed.get_file("c.py") is None def test_get_changed_files(self) -> None: parsed = ParsedDiff( files=[ DiffFile(old_path="a.py", new_path="a.py"), DiffFile(old_path="b.py", new_path="b.py"), ] ) assert parsed.get_changed_files() == ["a.py", "b.py"] class TestStaticFinding: def test_create_finding(self) -> None: finding = StaticFinding( tool="ruff", file="test.py", line=10, code="E501", message="Line too long", severity=Severity.LOW, ) assert finding.tool == "ruff" assert finding.severity == Severity.LOW class TestStaticAnalysisRunner: def test_ruff_severity_mapping(self) -> None: runner = StaticAnalysisRunner() assert runner._ruff_code_to_severity("S101") == Severity.HIGH assert runner._ruff_code_to_severity("E501") == Severity.MEDIUM assert runner._ruff_code_to_severity("W503") == Severity.LOW assert runner._ruff_code_to_severity("D100") == Severity.INFO @pytest.mark.asyncio async def test_run_without_work_dir(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) result = await runner.run(diff, work_dir=None) assert result.findings == [] assert result.tools_run == [] @pytest.mark.asyncio async def test_run_no_python_files(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[DiffFile(old_path="test.txt", new_path="test.txt")]) result = await runner.run(diff, work_dir=Path("/tmp")) assert result.findings == [] @pytest.mark.asyncio async def test_parse_ruff_output(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) ruff_output = """[ { "filename": "/work/test.py", "location": {"row": 5, "column": 1}, "end_location": {"row": 5, "column": 80}, "code": "E501", "message": "Line too long (100 > 79)", "fix": null, "url": "https://docs.astral.sh/ruff/rules/E501" } ]""" findings = runner._parse_ruff_output(ruff_output, diff, Path("/work")) assert len(findings) == 1 assert findings[0].code == "E501" assert findings[0].line == 5 assert findings[0].severity == Severity.MEDIUM @pytest.mark.asyncio async def test_parse_mypy_output(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) ], ) ] ) mypy_output = """/work/test.py:10:5: error: Incompatible return value type /work/test.py:15:1: note: See documentation""" findings = runner._parse_mypy_output(mypy_output, diff, Path("/work")) # Both lines are in the diff range (1-20), so both are included assert len(findings) == 2 assert findings[0].line == 10 assert findings[0].severity == Severity.MEDIUM assert findings[1].line == 15 assert findings[1].severity == Severity.INFO # note maps to INFO @pytest.mark.asyncio async def test_parse_bandit_output(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) ], ) ] ) bandit_output = """{ "results": [ { "filename": "/work/test.py", "line_number": 5, "test_id": "B303", "test_name": "blacklist", "issue_severity": "HIGH", "issue_confidence": "HIGH", "issue_text": "Use of insecure MD5 hash function" } ] }""" findings = runner._parse_bandit_output(bandit_output, diff, Path("/work")) assert len(findings) == 1 assert findings[0].code == "B303" assert findings[0].severity == Severity.HIGH @pytest.mark.asyncio async def test_parse_radon_output(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[]) ], ) ] ) radon_output = """{ "/work/test.py": [ { "type": "function", "name": "complex_func", "lineno": 10, "endline": 30, "complexity": 25, "rank": "D" } ] }""" findings = runner._parse_radon_output(radon_output, diff, Path("/work")) assert len(findings) == 1 assert findings[0].code == "CCD" assert findings[0].severity == Severity.HIGH assert "complexity" in findings[0].message.lower() def test_convert_to_finding(self) -> None: runner = StaticAnalysisRunner() static_finding = StaticFinding( tool="ruff", file="test.py", line=10, code="E501", message="Line too long", severity=Severity.LOW, ) finding = runner.convert_to_finding(static_finding) assert finding.file == "test.py" assert finding.line_start == 10 assert finding.severity == Severity.LOW assert "ruff" in finding.title.lower() @pytest.mark.asyncio async def test_config_disables_tools(self) -> None: config = StaticAnalysisConfig( ruff_enabled=False, mypy_enabled=False, bandit_enabled=False, radon_enabled=False, ) runner = StaticAnalysisRunner(config) diff = ParsedDiff(files=[DiffFile(old_path="test.py", new_path="test.py")]) result = await runner.run(diff, work_dir=Path("/tmp")) assert result.tools_run == [] def test_ruff_code_to_severity_empty(self) -> None: runner = StaticAnalysisRunner() assert runner._ruff_code_to_severity("") == Severity.MEDIUM def test_ruff_code_to_severity_unknown(self) -> None: runner = StaticAnalysisRunner() assert runner._ruff_code_to_severity("XYZ999") == Severity.MEDIUM def test_parse_ruff_output_empty(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) findings = runner._parse_ruff_output("", diff, Path("/work")) assert findings == [] findings = runner._parse_ruff_output(" ", diff, Path("/work")) assert findings == [] def test_parse_ruff_output_invalid_json(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) findings = runner._parse_ruff_output("not json", diff, Path("/work")) assert findings == [] def test_ruff_filters_outside_diff(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[]) ], ) ] ) # Finding at line 100 is outside the diff ruff_output = """[ { "filename": "/work/test.py", "location": {"row": 100, "column": 1}, "end_location": {"row": 100, "column": 80}, "code": "E501", "message": "Line too long" } ]""" findings = runner._parse_ruff_output(ruff_output, diff, Path("/work")) assert findings == [] def test_parse_mypy_output_empty(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) findings = runner._parse_mypy_output("", diff, Path("/work")) assert findings == [] def test_parse_mypy_output_invalid_format(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) findings = runner._parse_mypy_output("not a valid line", diff, Path("/work")) assert findings == [] def test_parse_mypy_output_warning_severity(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) ], ) ] ) mypy_output = "/work/test.py:5:1: warning: Deprecated function call" findings = runner._parse_mypy_output(mypy_output, diff, Path("/work")) assert len(findings) == 1 assert findings[0].severity == Severity.LOW def test_parse_bandit_output_empty(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) findings = runner._parse_bandit_output("", diff, Path("/work")) assert findings == [] def test_parse_bandit_output_invalid_json(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) findings = runner._parse_bandit_output("not json", diff, Path("/work")) assert findings == [] def test_parse_bandit_output_low_severity(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) ], ) ] ) bandit_output = """{ "results": [ { "filename": "/work/test.py", "line_number": 5, "test_id": "B105", "test_name": "hardcoded_password", "issue_severity": "LOW", "issue_confidence": "MEDIUM", "issue_text": "Possible hardcoded password" } ] }""" findings = runner._parse_bandit_output(bandit_output, diff, Path("/work")) assert len(findings) == 1 assert findings[0].severity == Severity.LOW def test_parse_radon_output_empty(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) findings = runner._parse_radon_output("", diff, Path("/work")) assert findings == [] def test_parse_radon_output_invalid_json(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff(files=[]) findings = runner._parse_radon_output("not json", diff, Path("/work")) assert findings == [] def test_radon_low_complexity_skip(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[]) ], ) ] ) radon_output = """{ "/work/test.py": [ { "type": "function", "name": "simple_func", "lineno": 10, "endline": 15, "complexity": 3, "rank": "A" }, { "type": "function", "name": "ok_func", "lineno": 20, "endline": 30, "complexity": 8, "rank": "B" } ] }""" findings = runner._parse_radon_output(radon_output, diff, Path("/work")) assert findings == [] def test_parse_radon_output_various_ranks(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=100, new_start=1, new_count=100, lines=[]) ], ) ] ) radon_output = """{ "/work/test.py": [ { "type": "function", "name": "moderate_func", "lineno": 10, "endline": 30, "complexity": 15, "rank": "C" }, { "type": "function", "name": "very_complex_func", "lineno": 50, "endline": 80, "complexity": 45, "rank": "F" } ] }""" findings = runner._parse_radon_output(radon_output, diff, Path("/work")) assert len(findings) == 2 assert findings[0].severity == Severity.MEDIUM # C rank assert findings[1].severity == Severity.CRITICAL # F rank def test_convert_to_finding_with_url(self) -> None: runner = StaticAnalysisRunner() static_finding = StaticFinding( tool="ruff", file="test.py", line=10, code="E501", message="Line too long", severity=Severity.LOW, extra={"url": "https://docs.astral.sh/ruff/rules/E501"}, ) finding = runner.convert_to_finding(static_finding) assert "https://docs.astral.sh/ruff/rules/E501" in finding.references def test_convert_to_finding_without_url(self) -> None: runner = StaticAnalysisRunner() static_finding = StaticFinding( tool="mypy", file="test.py", line=10, code="mypy-error", message="Type error", severity=Severity.MEDIUM, ) finding = runner.convert_to_finding(static_finding) assert finding.references == [] def test_finding_with_fix(self) -> None: runner = StaticAnalysisRunner() static_finding = StaticFinding( tool="ruff", file="test.py", line=10, code="E501", message="Line too long", severity=Severity.LOW, fix_available=True, ) finding = runner.convert_to_finding(static_finding) assert finding.suggestion is not None class TestDiffParserAdvanced: def test_parse_binary_file(self) -> None: diff = """diff --git a/image.png b/image.png new file mode 100644 Binary files /dev/null and b/image.png differ """ parser = DiffParser() result = parser.parse(diff) assert len(result.files) == 1 assert result.files[0].is_binary is True assert result.files[0].is_new is True def test_parse_hunk_no_newline(self) -> None: diff = """diff --git a/test.py b/test.py --- a/test.py +++ b/test.py @@ -1,2 +1,2 @@ line1 -line2 +line2 modified \\ No newline at end of file """ parser = DiffParser() result = parser.parse(diff) assert len(result.files) == 1 # The \\ line should be skipped assert len(result.files[0].hunks[0].lines) == 3 def test_parse_hunk_with_function_header(self) -> None: diff = """diff --git a/test.py b/test.py --- a/test.py +++ b/test.py @@ -10,5 +10,6 @@ class MyClass: context +added """ parser = DiffParser() result = parser.parse(diff) assert result.files[0].hunks[0].header == "class MyClass:" def test_diff_hunk_get_added_lines(self) -> None: hunk = DiffHunk( old_start=1, old_count=3, new_start=1, new_count=4, lines=[ DiffLine(content="ctx", line_type=LineType.CONTEXT, old_line=1, new_line=1), DiffLine(content="removed", line_type=LineType.REMOVED, old_line=2), DiffLine(content="added", line_type=LineType.ADDED, new_line=2), DiffLine(content="ctx2", line_type=LineType.CONTEXT, old_line=3, new_line=3), ], ) added = hunk.get_added_lines() assert len(added) == 1 assert added[0].content == "added" def test_diff_hunk_get_removed_lines(self) -> None: hunk = DiffHunk( old_start=1, old_count=3, new_start=1, new_count=2, lines=[ DiffLine(content="ctx", line_type=LineType.CONTEXT, old_line=1, new_line=1), DiffLine(content="removed", line_type=LineType.REMOVED, old_line=2), DiffLine(content="ctx2", line_type=LineType.CONTEXT, old_line=3, new_line=2), ], ) removed = hunk.get_removed_lines() assert len(removed) == 1 assert removed[0].content == "removed" def test_diff_file_map_new_to_old(self) -> None: diff_file = DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk( old_start=1, old_count=2, new_start=1, new_count=2, lines=[ DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1), DiffLine(content="b", line_type=LineType.CONTEXT, old_line=2, new_line=2), ], ) ], ) assert diff_file.map_new_to_old(1) == 1 assert diff_file.map_new_to_old(2) == 2 assert diff_file.map_new_to_old(100) is None def test_diff_file_map_new_to_old_added_line(self) -> None: diff_file = DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk( old_start=1, old_count=1, new_start=1, new_count=2, lines=[ DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1), DiffLine(content="b", line_type=LineType.ADDED, new_line=2), ], ) ], ) # Added line has no old_line assert diff_file.map_new_to_old(2) is None def test_parsed_diff_get_added_files(self) -> None: parsed = ParsedDiff( files=[ DiffFile(old_path="a.py", new_path="a.py", is_new=True), DiffFile(old_path="b.py", new_path="b.py", is_new=False), ] ) assert parsed.get_added_files() == ["a.py"] def test_parsed_diff_get_deleted_files(self) -> None: parsed = ParsedDiff( files=[ DiffFile(old_path="a.py", new_path="a.py", is_deleted=True), DiffFile(old_path="b.py", new_path="b.py", is_deleted=False), ] ) assert parsed.get_deleted_files() == ["a.py"] def test_changed_range_empty(self) -> None: diff_file = DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk( old_start=1, old_count=2, new_start=1, new_count=1, lines=[ DiffLine(content="a", line_type=LineType.REMOVED, old_line=1), DiffLine(content="b", line_type=LineType.CONTEXT, old_line=2, new_line=1), ], ) ], ) assert diff_file.get_changed_line_range() is None def test_diff_file_path_deleted(self) -> None: diff_file = DiffFile( old_path="old.py", new_path="new.py", is_deleted=True, ) assert diff_file.path == "old.py" @pytest.mark.asyncio async def test_run_static_analysis_convenience() -> None: diff = ParsedDiff(files=[]) result = await run_static_analysis(diff, work_dir=None) assert result.findings == [] assert result.tools_run == [] class TestStaticAnalysisRunMethods: @pytest.mark.asyncio async def test_run_ruff_file_not_found(self, tmp_path: Path) -> None: from unittest.mock import patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) # Create the test file (tmp_path / "test.py").write_text("print('hello')") with ( patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()), pytest.raises(RuntimeError, match="ruff not found"), ): await runner._run_ruff(diff.files, tmp_path, diff) @pytest.mark.asyncio async def test_run_ruff_with_config(self, tmp_path: Path) -> None: from unittest.mock import AsyncMock, patch config = StaticAnalysisConfig(ruff_config="/path/to/ruff.toml") runner = StaticAnalysisRunner(config) diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) # Create the test file (tmp_path / "test.py").write_text("print('hello')") mock_proc = AsyncMock() mock_proc.communicate.return_value = (b"[]", b"") with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: await runner._run_ruff(diff.files, tmp_path, diff) # Verify config was passed call_args = mock_exec.call_args assert "--config" in call_args[0] assert "/path/to/ruff.toml" in call_args[0] @pytest.mark.asyncio async def test_run_ruff_returns_empty_for_no_files(self, tmp_path: Path) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="nonexistent.py", new_path="nonexistent.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) result = await runner._run_ruff(diff.files, tmp_path, diff) assert result == [] @pytest.mark.asyncio async def test_run_mypy_file_not_found(self, tmp_path: Path) -> None: from unittest.mock import patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("print('hello')") with ( patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()), pytest.raises(RuntimeError, match="mypy not found"), ): await runner._run_mypy(diff.files, tmp_path, diff) @pytest.mark.asyncio async def test_run_mypy_with_config(self, tmp_path: Path) -> None: from unittest.mock import AsyncMock, patch config = StaticAnalysisConfig(mypy_config="/path/to/mypy.ini") runner = StaticAnalysisRunner(config) diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("print('hello')") mock_proc = AsyncMock() mock_proc.communicate.return_value = (b"", b"") with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: await runner._run_mypy(diff.files, tmp_path, diff) call_args = mock_exec.call_args assert "--config-file" in call_args[0] assert "/path/to/mypy.ini" in call_args[0] @pytest.mark.asyncio async def test_run_bandit_file_not_found(self, tmp_path: Path) -> None: from unittest.mock import patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("print('hello')") with ( patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()), pytest.raises(RuntimeError, match="bandit not found"), ): await runner._run_bandit(diff.files, tmp_path, diff) @pytest.mark.asyncio async def test_run_radon_file_not_found(self, tmp_path: Path) -> None: from unittest.mock import patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("print('hello')") with ( patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()), pytest.raises(RuntimeError, match="radon not found"), ): await runner._run_radon(diff.files, tmp_path, diff) @pytest.mark.asyncio async def test_run_with_tool_error(self, tmp_path: Path) -> None: from unittest.mock import patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("print('hello')") # Make ruff fail with RuntimeError with ( patch.object(runner, "_run_ruff", side_effect=RuntimeError("ruff not found")), patch.object(runner, "_run_mypy", return_value=[]), patch.object(runner, "_run_bandit", return_value=[]), patch.object(runner, "_run_radon", return_value=[]), ): result = await runner.run(diff, work_dir=tmp_path) assert "ruff" in result.tool_errors assert "not found" in result.tool_errors["ruff"] @pytest.mark.asyncio async def test_run_ruff_success_with_output(self, tmp_path: Path) -> None: from unittest.mock import AsyncMock, patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("x=1") ruff_output = f"""[ {{ "filename": "{tmp_path}/test.py", "location": {{"row": 5, "column": 1}}, "end_location": {{"row": 5, "column": 80}}, "code": "E501", "message": "Line too long" }} ]""" mock_proc = AsyncMock() mock_proc.communicate.return_value = (ruff_output.encode(), b"") with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await runner._run_ruff(diff.files, tmp_path, diff) assert len(result) == 1 assert result[0].code == "E501" @pytest.mark.asyncio async def test_run_mypy_success_with_output(self, tmp_path: Path) -> None: from unittest.mock import AsyncMock, patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("x: int = 'string'") mypy_output = f"{tmp_path}/test.py:5:1: error: Type mismatch" mock_proc = AsyncMock() mock_proc.communicate.return_value = (mypy_output.encode(), b"") with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await runner._run_mypy(diff.files, tmp_path, diff) assert len(result) == 1 assert result[0].severity == Severity.MEDIUM @pytest.mark.asyncio async def test_run_bandit_success_with_output(self, tmp_path: Path) -> None: from unittest.mock import AsyncMock, patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("import md5") bandit_output = f"""{{ "results": [ {{ "filename": "{tmp_path}/test.py", "line_number": 5, "test_id": "B303", "test_name": "blacklist", "issue_severity": "HIGH", "issue_confidence": "HIGH", "issue_text": "Use of insecure hash" }} ] }}""" mock_proc = AsyncMock() mock_proc.communicate.return_value = (bandit_output.encode(), b"") with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await runner._run_bandit(diff.files, tmp_path, diff) assert len(result) == 1 assert result[0].severity == Severity.HIGH @pytest.mark.asyncio async def test_run_radon_success_with_output(self, tmp_path: Path) -> None: from unittest.mock import AsyncMock, patch runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[]) ], ) ] ) (tmp_path / "test.py").write_text("def func(): pass") radon_output = f"""{{ "{tmp_path}/test.py": [ {{ "type": "function", "name": "complex_func", "lineno": 10, "endline": 30, "complexity": 25, "rank": "D" }} ] }}""" mock_proc = AsyncMock() mock_proc.communicate.return_value = (radon_output.encode(), b"") with patch("asyncio.create_subprocess_exec", return_value=mock_proc): result = await runner._run_radon(diff.files, tmp_path, diff) assert len(result) == 1 assert result[0].severity == Severity.HIGH def test_mypy_filters_outside_diff(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[]) ], ) ] ) # Line 100 is outside the diff range (10-15) mypy_output = "/work/test.py:100:1: error: Type mismatch" findings = runner._parse_mypy_output(mypy_output, diff, Path("/work")) assert findings == [] def test_bandit_outside_diff(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[]) ], ) ] ) bandit_output = """{ "results": [ { "filename": "/work/test.py", "line_number": 100, "test_id": "B303", "test_name": "blacklist", "issue_severity": "HIGH", "issue_confidence": "HIGH", "issue_text": "Use of insecure hash" } ] }""" findings = runner._parse_bandit_output(bandit_output, diff, Path("/work")) assert findings == [] def test_radon_outside_diff(self) -> None: runner = StaticAnalysisRunner() diff = ParsedDiff( files=[ DiffFile( old_path="test.py", new_path="test.py", hunks=[ DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[]) ], ) ] ) radon_output = """{ "/work/test.py": [ { "type": "function", "name": "complex_func", "lineno": 100, "endline": 120, "complexity": 25, "rank": "D" } ] }""" findings = runner._parse_radon_output(radon_output, diff, Path("/work")) assert findings == []