Files
arbiter/tests/test_static_analysis.py

1345 lines
42 KiB
Python

"""Tests for static analysis module."""
from pathlib import Path
import pytest
from arbiter.analysis.diff import DiffFile, DiffHunk, DiffLine, DiffParser, LineType, ParsedDiff
from arbiter.analysis.static import (
StaticAnalysisConfig,
StaticAnalysisRunner,
StaticFinding,
run_static_analysis,
)
from arbiter.models import Severity
class TestDiffParser:
def test_parse_empty(self) -> None:
parser = DiffParser()
result = parser.parse("")
assert result.files == []
assert result.raw_diff == ""
def test_parse_single_file(self) -> None:
diff = """diff --git a/test.py b/test.py
index 1234567..abcdefg 100644
--- a/test.py
+++ b/test.py
@@ -1,3 +1,4 @@
line1
+added
line2
line3
"""
parser = DiffParser()
result = parser.parse(diff)
assert len(result.files) == 1
assert result.files[0].old_path == "test.py"
assert result.files[0].new_path == "test.py"
assert len(result.files[0].hunks) == 1
def test_parse_hunk_header(self) -> None:
diff = """diff --git a/test.py b/test.py
index 1234567..abcdefg 100644
--- a/test.py
+++ b/test.py
@@ -10,5 +10,6 @@ def function():
context
+added
more context
"""
parser = DiffParser()
result = parser.parse(diff)
hunk = result.files[0].hunks[0]
assert hunk.old_start == 10
assert hunk.old_count == 5
assert hunk.new_start == 10
assert hunk.new_count == 6
assert hunk.header == "def function():"
def test_parse_line_types(self) -> None:
diff = """diff --git a/test.py b/test.py
--- a/test.py
+++ b/test.py
@@ -1,4 +1,4 @@
context
-removed
+added
more context
"""
parser = DiffParser()
result = parser.parse(diff)
lines = result.files[0].hunks[0].lines
assert lines[0].line_type == LineType.CONTEXT
assert lines[1].line_type == LineType.REMOVED
assert lines[2].line_type == LineType.ADDED
assert lines[3].line_type == LineType.CONTEXT
def test_parse_line_numbers(self) -> None:
diff = """diff --git a/test.py b/test.py
--- a/test.py
+++ b/test.py
@@ -5,4 +5,5 @@
context
-removed
+added1
+added2
final
"""
parser = DiffParser()
result = parser.parse(diff)
lines = result.files[0].hunks[0].lines
# Context line at 5
assert lines[0].old_line == 5
assert lines[0].new_line == 5
# Removed line at old 6
assert lines[1].old_line == 6
assert lines[1].new_line is None
# Added lines at new 6, 7
assert lines[2].old_line is None
assert lines[2].new_line == 6
assert lines[3].old_line is None
assert lines[3].new_line == 7
def test_parse_multi_file(self) -> None:
diff = """diff --git a/a.py b/a.py
--- a/a.py
+++ b/a.py
@@ -1,2 +1,3 @@
line1
+new
line2
diff --git a/b.py b/b.py
--- a/b.py
+++ b/b.py
@@ -1 +1,2 @@
only
+added
"""
parser = DiffParser()
result = parser.parse(diff)
assert len(result.files) == 2
assert result.files[0].path == "a.py"
assert result.files[1].path == "b.py"
def test_parse_new_file(self) -> None:
diff = """diff --git a/new.py b/new.py
new file mode 100644
--- /dev/null
+++ b/new.py
@@ -0,0 +1,3 @@
+line1
+line2
+line3
"""
parser = DiffParser()
result = parser.parse(diff)
assert result.files[0].is_new is True
assert result.files[0].path == "new.py"
def test_parse_deleted_file(self) -> None:
diff = """diff --git a/old.py b/old.py
deleted file mode 100644
--- a/old.py
+++ /dev/null
@@ -1,2 +0,0 @@
-line1
-line2
"""
parser = DiffParser()
result = parser.parse(diff)
assert result.files[0].is_deleted is True
assert result.files[0].path == "old.py"
class TestDiffFile:
def test_get_added_line_numbers(self) -> None:
diff_file = DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(
old_start=1,
old_count=2,
new_start=1,
new_count=4,
lines=[
DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1),
DiffLine(content="b", line_type=LineType.ADDED, new_line=2),
DiffLine(content="c", line_type=LineType.ADDED, new_line=3),
DiffLine(content="d", line_type=LineType.CONTEXT, old_line=2, new_line=4),
],
)
],
)
assert diff_file.get_added_line_numbers() == [2, 3]
def test_get_changed_line_range(self) -> None:
diff_file = DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(
old_start=10,
old_count=3,
new_start=10,
new_count=5,
lines=[
DiffLine(content="a", line_type=LineType.ADDED, new_line=10),
DiffLine(content="b", line_type=LineType.ADDED, new_line=11),
DiffLine(content="c", line_type=LineType.ADDED, new_line=14),
],
)
],
)
assert diff_file.get_changed_line_range() == (10, 14)
def test_line_in_diff(self) -> None:
diff_file = DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[DiffHunk(old_start=10, old_count=5, new_start=10, new_count=6, lines=[])],
)
assert diff_file.line_in_diff(10) is True
assert diff_file.line_in_diff(15) is True
assert diff_file.line_in_diff(16) is False
assert diff_file.line_in_diff(9) is False
class TestParsedDiff:
def test_get_file(self) -> None:
parsed = ParsedDiff(
files=[
DiffFile(old_path="a.py", new_path="a.py"),
DiffFile(old_path="b.py", new_path="b.py"),
]
)
assert parsed.get_file("a.py") is not None
assert parsed.get_file("c.py") is None
def test_get_changed_files(self) -> None:
parsed = ParsedDiff(
files=[
DiffFile(old_path="a.py", new_path="a.py"),
DiffFile(old_path="b.py", new_path="b.py"),
]
)
assert parsed.get_changed_files() == ["a.py", "b.py"]
class TestStaticFinding:
def test_create_finding(self) -> None:
finding = StaticFinding(
tool="ruff",
file="test.py",
line=10,
code="E501",
message="Line too long",
severity=Severity.LOW,
)
assert finding.tool == "ruff"
assert finding.severity == Severity.LOW
class TestStaticAnalysisRunner:
def test_ruff_severity_mapping(self) -> None:
runner = StaticAnalysisRunner()
assert runner._ruff_code_to_severity("S101") == Severity.HIGH
assert runner._ruff_code_to_severity("E501") == Severity.MEDIUM
assert runner._ruff_code_to_severity("W503") == Severity.LOW
assert runner._ruff_code_to_severity("D100") == Severity.INFO
@pytest.mark.asyncio
async def test_run_without_work_dir(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
result = await runner.run(diff, work_dir=None)
assert result.findings == []
assert result.tools_run == []
@pytest.mark.asyncio
async def test_run_no_python_files(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[DiffFile(old_path="test.txt", new_path="test.txt")])
result = await runner.run(diff, work_dir=Path("/tmp"))
assert result.findings == []
@pytest.mark.asyncio
async def test_parse_ruff_output(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
ruff_output = """[
{
"filename": "/work/test.py",
"location": {"row": 5, "column": 1},
"end_location": {"row": 5, "column": 80},
"code": "E501",
"message": "Line too long (100 > 79)",
"fix": null,
"url": "https://docs.astral.sh/ruff/rules/E501"
}
]"""
findings = runner._parse_ruff_output(ruff_output, diff, Path("/work"))
assert len(findings) == 1
assert findings[0].code == "E501"
assert findings[0].line == 5
assert findings[0].severity == Severity.MEDIUM
@pytest.mark.asyncio
async def test_parse_mypy_output(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
],
)
]
)
mypy_output = """/work/test.py:10:5: error: Incompatible return value type
/work/test.py:15:1: note: See documentation"""
findings = runner._parse_mypy_output(mypy_output, diff, Path("/work"))
# Both lines are in the diff range (1-20), so both are included
assert len(findings) == 2
assert findings[0].line == 10
assert findings[0].severity == Severity.MEDIUM
assert findings[1].line == 15
assert findings[1].severity == Severity.INFO # note maps to INFO
@pytest.mark.asyncio
async def test_parse_bandit_output(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
],
)
]
)
bandit_output = """{
"results": [
{
"filename": "/work/test.py",
"line_number": 5,
"test_id": "B303",
"test_name": "blacklist",
"issue_severity": "HIGH",
"issue_confidence": "HIGH",
"issue_text": "Use of insecure MD5 hash function"
}
]
}"""
findings = runner._parse_bandit_output(bandit_output, diff, Path("/work"))
assert len(findings) == 1
assert findings[0].code == "B303"
assert findings[0].severity == Severity.HIGH
@pytest.mark.asyncio
async def test_parse_radon_output(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[])
],
)
]
)
radon_output = """{
"/work/test.py": [
{
"type": "function",
"name": "complex_func",
"lineno": 10,
"endline": 30,
"complexity": 25,
"rank": "D"
}
]
}"""
findings = runner._parse_radon_output(radon_output, diff, Path("/work"))
assert len(findings) == 1
assert findings[0].code == "CCD"
assert findings[0].severity == Severity.HIGH
assert "complexity" in findings[0].message.lower()
def test_convert_to_finding(self) -> None:
runner = StaticAnalysisRunner()
static_finding = StaticFinding(
tool="ruff",
file="test.py",
line=10,
code="E501",
message="Line too long",
severity=Severity.LOW,
)
finding = runner.convert_to_finding(static_finding)
assert finding.file == "test.py"
assert finding.line_start == 10
assert finding.severity == Severity.LOW
assert "ruff" in finding.title.lower()
@pytest.mark.asyncio
async def test_config_disables_tools(self) -> None:
config = StaticAnalysisConfig(
ruff_enabled=False,
mypy_enabled=False,
bandit_enabled=False,
radon_enabled=False,
)
runner = StaticAnalysisRunner(config)
diff = ParsedDiff(files=[DiffFile(old_path="test.py", new_path="test.py")])
result = await runner.run(diff, work_dir=Path("/tmp"))
assert result.tools_run == []
def test_ruff_code_to_severity_empty(self) -> None:
runner = StaticAnalysisRunner()
assert runner._ruff_code_to_severity("") == Severity.MEDIUM
def test_ruff_code_to_severity_unknown(self) -> None:
runner = StaticAnalysisRunner()
assert runner._ruff_code_to_severity("XYZ999") == Severity.MEDIUM
def test_parse_ruff_output_empty(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
findings = runner._parse_ruff_output("", diff, Path("/work"))
assert findings == []
findings = runner._parse_ruff_output(" ", diff, Path("/work"))
assert findings == []
def test_parse_ruff_output_invalid_json(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
findings = runner._parse_ruff_output("not json", diff, Path("/work"))
assert findings == []
def test_ruff_filters_outside_diff(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[])
],
)
]
)
# Finding at line 100 is outside the diff
ruff_output = """[
{
"filename": "/work/test.py",
"location": {"row": 100, "column": 1},
"end_location": {"row": 100, "column": 80},
"code": "E501",
"message": "Line too long"
}
]"""
findings = runner._parse_ruff_output(ruff_output, diff, Path("/work"))
assert findings == []
def test_parse_mypy_output_empty(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
findings = runner._parse_mypy_output("", diff, Path("/work"))
assert findings == []
def test_parse_mypy_output_invalid_format(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
findings = runner._parse_mypy_output("not a valid line", diff, Path("/work"))
assert findings == []
def test_parse_mypy_output_warning_severity(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
],
)
]
)
mypy_output = "/work/test.py:5:1: warning: Deprecated function call"
findings = runner._parse_mypy_output(mypy_output, diff, Path("/work"))
assert len(findings) == 1
assert findings[0].severity == Severity.LOW
def test_parse_bandit_output_empty(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
findings = runner._parse_bandit_output("", diff, Path("/work"))
assert findings == []
def test_parse_bandit_output_invalid_json(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
findings = runner._parse_bandit_output("not json", diff, Path("/work"))
assert findings == []
def test_parse_bandit_output_low_severity(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
],
)
]
)
bandit_output = """{
"results": [
{
"filename": "/work/test.py",
"line_number": 5,
"test_id": "B105",
"test_name": "hardcoded_password",
"issue_severity": "LOW",
"issue_confidence": "MEDIUM",
"issue_text": "Possible hardcoded password"
}
]
}"""
findings = runner._parse_bandit_output(bandit_output, diff, Path("/work"))
assert len(findings) == 1
assert findings[0].severity == Severity.LOW
def test_parse_radon_output_empty(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
findings = runner._parse_radon_output("", diff, Path("/work"))
assert findings == []
def test_parse_radon_output_invalid_json(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(files=[])
findings = runner._parse_radon_output("not json", diff, Path("/work"))
assert findings == []
def test_radon_low_complexity_skip(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[])
],
)
]
)
radon_output = """{
"/work/test.py": [
{
"type": "function",
"name": "simple_func",
"lineno": 10,
"endline": 15,
"complexity": 3,
"rank": "A"
},
{
"type": "function",
"name": "ok_func",
"lineno": 20,
"endline": 30,
"complexity": 8,
"rank": "B"
}
]
}"""
findings = runner._parse_radon_output(radon_output, diff, Path("/work"))
assert findings == []
def test_parse_radon_output_various_ranks(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=100, new_start=1, new_count=100, lines=[])
],
)
]
)
radon_output = """{
"/work/test.py": [
{
"type": "function",
"name": "moderate_func",
"lineno": 10,
"endline": 30,
"complexity": 15,
"rank": "C"
},
{
"type": "function",
"name": "very_complex_func",
"lineno": 50,
"endline": 80,
"complexity": 45,
"rank": "F"
}
]
}"""
findings = runner._parse_radon_output(radon_output, diff, Path("/work"))
assert len(findings) == 2
assert findings[0].severity == Severity.MEDIUM # C rank
assert findings[1].severity == Severity.CRITICAL # F rank
def test_convert_to_finding_with_url(self) -> None:
runner = StaticAnalysisRunner()
static_finding = StaticFinding(
tool="ruff",
file="test.py",
line=10,
code="E501",
message="Line too long",
severity=Severity.LOW,
extra={"url": "https://docs.astral.sh/ruff/rules/E501"},
)
finding = runner.convert_to_finding(static_finding)
assert "https://docs.astral.sh/ruff/rules/E501" in finding.references
def test_convert_to_finding_without_url(self) -> None:
runner = StaticAnalysisRunner()
static_finding = StaticFinding(
tool="mypy",
file="test.py",
line=10,
code="mypy-error",
message="Type error",
severity=Severity.MEDIUM,
)
finding = runner.convert_to_finding(static_finding)
assert finding.references == []
def test_finding_with_fix(self) -> None:
runner = StaticAnalysisRunner()
static_finding = StaticFinding(
tool="ruff",
file="test.py",
line=10,
code="E501",
message="Line too long",
severity=Severity.LOW,
fix_available=True,
)
finding = runner.convert_to_finding(static_finding)
assert finding.suggestion is not None
class TestDiffParserAdvanced:
def test_parse_binary_file(self) -> None:
diff = """diff --git a/image.png b/image.png
new file mode 100644
Binary files /dev/null and b/image.png differ
"""
parser = DiffParser()
result = parser.parse(diff)
assert len(result.files) == 1
assert result.files[0].is_binary is True
assert result.files[0].is_new is True
def test_parse_hunk_no_newline(self) -> None:
diff = """diff --git a/test.py b/test.py
--- a/test.py
+++ b/test.py
@@ -1,2 +1,2 @@
line1
-line2
+line2 modified
\\ No newline at end of file
"""
parser = DiffParser()
result = parser.parse(diff)
assert len(result.files) == 1
# The \\ line should be skipped
assert len(result.files[0].hunks[0].lines) == 3
def test_parse_hunk_with_function_header(self) -> None:
diff = """diff --git a/test.py b/test.py
--- a/test.py
+++ b/test.py
@@ -10,5 +10,6 @@ class MyClass:
context
+added
"""
parser = DiffParser()
result = parser.parse(diff)
assert result.files[0].hunks[0].header == "class MyClass:"
def test_diff_hunk_get_added_lines(self) -> None:
hunk = DiffHunk(
old_start=1,
old_count=3,
new_start=1,
new_count=4,
lines=[
DiffLine(content="ctx", line_type=LineType.CONTEXT, old_line=1, new_line=1),
DiffLine(content="removed", line_type=LineType.REMOVED, old_line=2),
DiffLine(content="added", line_type=LineType.ADDED, new_line=2),
DiffLine(content="ctx2", line_type=LineType.CONTEXT, old_line=3, new_line=3),
],
)
added = hunk.get_added_lines()
assert len(added) == 1
assert added[0].content == "added"
def test_diff_hunk_get_removed_lines(self) -> None:
hunk = DiffHunk(
old_start=1,
old_count=3,
new_start=1,
new_count=2,
lines=[
DiffLine(content="ctx", line_type=LineType.CONTEXT, old_line=1, new_line=1),
DiffLine(content="removed", line_type=LineType.REMOVED, old_line=2),
DiffLine(content="ctx2", line_type=LineType.CONTEXT, old_line=3, new_line=2),
],
)
removed = hunk.get_removed_lines()
assert len(removed) == 1
assert removed[0].content == "removed"
def test_diff_file_map_new_to_old(self) -> None:
diff_file = DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(
old_start=1,
old_count=2,
new_start=1,
new_count=2,
lines=[
DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1),
DiffLine(content="b", line_type=LineType.CONTEXT, old_line=2, new_line=2),
],
)
],
)
assert diff_file.map_new_to_old(1) == 1
assert diff_file.map_new_to_old(2) == 2
assert diff_file.map_new_to_old(100) is None
def test_diff_file_map_new_to_old_added_line(self) -> None:
diff_file = DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(
old_start=1,
old_count=1,
new_start=1,
new_count=2,
lines=[
DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1),
DiffLine(content="b", line_type=LineType.ADDED, new_line=2),
],
)
],
)
# Added line has no old_line
assert diff_file.map_new_to_old(2) is None
def test_parsed_diff_get_added_files(self) -> None:
parsed = ParsedDiff(
files=[
DiffFile(old_path="a.py", new_path="a.py", is_new=True),
DiffFile(old_path="b.py", new_path="b.py", is_new=False),
]
)
assert parsed.get_added_files() == ["a.py"]
def test_parsed_diff_get_deleted_files(self) -> None:
parsed = ParsedDiff(
files=[
DiffFile(old_path="a.py", new_path="a.py", is_deleted=True),
DiffFile(old_path="b.py", new_path="b.py", is_deleted=False),
]
)
assert parsed.get_deleted_files() == ["a.py"]
def test_changed_range_empty(self) -> None:
diff_file = DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(
old_start=1,
old_count=2,
new_start=1,
new_count=1,
lines=[
DiffLine(content="a", line_type=LineType.REMOVED, old_line=1),
DiffLine(content="b", line_type=LineType.CONTEXT, old_line=2, new_line=1),
],
)
],
)
assert diff_file.get_changed_line_range() is None
def test_diff_file_path_deleted(self) -> None:
diff_file = DiffFile(
old_path="old.py",
new_path="new.py",
is_deleted=True,
)
assert diff_file.path == "old.py"
@pytest.mark.asyncio
async def test_run_static_analysis_convenience() -> None:
diff = ParsedDiff(files=[])
result = await run_static_analysis(diff, work_dir=None)
assert result.findings == []
assert result.tools_run == []
class TestStaticAnalysisRunMethods:
@pytest.mark.asyncio
async def test_run_ruff_file_not_found(self, tmp_path: Path) -> None:
from unittest.mock import patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
# Create the test file
(tmp_path / "test.py").write_text("print('hello')")
with (
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
pytest.raises(RuntimeError, match="ruff not found"),
):
await runner._run_ruff(diff.files, tmp_path, diff)
@pytest.mark.asyncio
async def test_run_ruff_with_config(self, tmp_path: Path) -> None:
from unittest.mock import AsyncMock, patch
config = StaticAnalysisConfig(ruff_config="/path/to/ruff.toml")
runner = StaticAnalysisRunner(config)
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
# Create the test file
(tmp_path / "test.py").write_text("print('hello')")
mock_proc = AsyncMock()
mock_proc.communicate.return_value = (b"[]", b"")
with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec:
await runner._run_ruff(diff.files, tmp_path, diff)
# Verify config was passed
call_args = mock_exec.call_args
assert "--config" in call_args[0]
assert "/path/to/ruff.toml" in call_args[0]
@pytest.mark.asyncio
async def test_run_ruff_returns_empty_for_no_files(self, tmp_path: Path) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="nonexistent.py",
new_path="nonexistent.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
result = await runner._run_ruff(diff.files, tmp_path, diff)
assert result == []
@pytest.mark.asyncio
async def test_run_mypy_file_not_found(self, tmp_path: Path) -> None:
from unittest.mock import patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("print('hello')")
with (
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
pytest.raises(RuntimeError, match="mypy not found"),
):
await runner._run_mypy(diff.files, tmp_path, diff)
@pytest.mark.asyncio
async def test_run_mypy_with_config(self, tmp_path: Path) -> None:
from unittest.mock import AsyncMock, patch
config = StaticAnalysisConfig(mypy_config="/path/to/mypy.ini")
runner = StaticAnalysisRunner(config)
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("print('hello')")
mock_proc = AsyncMock()
mock_proc.communicate.return_value = (b"", b"")
with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec:
await runner._run_mypy(diff.files, tmp_path, diff)
call_args = mock_exec.call_args
assert "--config-file" in call_args[0]
assert "/path/to/mypy.ini" in call_args[0]
@pytest.mark.asyncio
async def test_run_bandit_file_not_found(self, tmp_path: Path) -> None:
from unittest.mock import patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("print('hello')")
with (
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
pytest.raises(RuntimeError, match="bandit not found"),
):
await runner._run_bandit(diff.files, tmp_path, diff)
@pytest.mark.asyncio
async def test_run_radon_file_not_found(self, tmp_path: Path) -> None:
from unittest.mock import patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("print('hello')")
with (
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
pytest.raises(RuntimeError, match="radon not found"),
):
await runner._run_radon(diff.files, tmp_path, diff)
@pytest.mark.asyncio
async def test_run_with_tool_error(self, tmp_path: Path) -> None:
from unittest.mock import patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("print('hello')")
# Make ruff fail with RuntimeError
with (
patch.object(runner, "_run_ruff", side_effect=RuntimeError("ruff not found")),
patch.object(runner, "_run_mypy", return_value=[]),
patch.object(runner, "_run_bandit", return_value=[]),
patch.object(runner, "_run_radon", return_value=[]),
):
result = await runner.run(diff, work_dir=tmp_path)
assert "ruff" in result.tool_errors
assert "not found" in result.tool_errors["ruff"]
@pytest.mark.asyncio
async def test_run_ruff_success_with_output(self, tmp_path: Path) -> None:
from unittest.mock import AsyncMock, patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("x=1")
ruff_output = f"""[
{{
"filename": "{tmp_path}/test.py",
"location": {{"row": 5, "column": 1}},
"end_location": {{"row": 5, "column": 80}},
"code": "E501",
"message": "Line too long"
}}
]"""
mock_proc = AsyncMock()
mock_proc.communicate.return_value = (ruff_output.encode(), b"")
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
result = await runner._run_ruff(diff.files, tmp_path, diff)
assert len(result) == 1
assert result[0].code == "E501"
@pytest.mark.asyncio
async def test_run_mypy_success_with_output(self, tmp_path: Path) -> None:
from unittest.mock import AsyncMock, patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("x: int = 'string'")
mypy_output = f"{tmp_path}/test.py:5:1: error: Type mismatch"
mock_proc = AsyncMock()
mock_proc.communicate.return_value = (mypy_output.encode(), b"")
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
result = await runner._run_mypy(diff.files, tmp_path, diff)
assert len(result) == 1
assert result[0].severity == Severity.MEDIUM
@pytest.mark.asyncio
async def test_run_bandit_success_with_output(self, tmp_path: Path) -> None:
from unittest.mock import AsyncMock, patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("import md5")
bandit_output = f"""{{
"results": [
{{
"filename": "{tmp_path}/test.py",
"line_number": 5,
"test_id": "B303",
"test_name": "blacklist",
"issue_severity": "HIGH",
"issue_confidence": "HIGH",
"issue_text": "Use of insecure hash"
}}
]
}}"""
mock_proc = AsyncMock()
mock_proc.communicate.return_value = (bandit_output.encode(), b"")
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
result = await runner._run_bandit(diff.files, tmp_path, diff)
assert len(result) == 1
assert result[0].severity == Severity.HIGH
@pytest.mark.asyncio
async def test_run_radon_success_with_output(self, tmp_path: Path) -> None:
from unittest.mock import AsyncMock, patch
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[])
],
)
]
)
(tmp_path / "test.py").write_text("def func(): pass")
radon_output = f"""{{
"{tmp_path}/test.py": [
{{
"type": "function",
"name": "complex_func",
"lineno": 10,
"endline": 30,
"complexity": 25,
"rank": "D"
}}
]
}}"""
mock_proc = AsyncMock()
mock_proc.communicate.return_value = (radon_output.encode(), b"")
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
result = await runner._run_radon(diff.files, tmp_path, diff)
assert len(result) == 1
assert result[0].severity == Severity.HIGH
def test_mypy_filters_outside_diff(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[])
],
)
]
)
# Line 100 is outside the diff range (10-15)
mypy_output = "/work/test.py:100:1: error: Type mismatch"
findings = runner._parse_mypy_output(mypy_output, diff, Path("/work"))
assert findings == []
def test_bandit_outside_diff(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[])
],
)
]
)
bandit_output = """{
"results": [
{
"filename": "/work/test.py",
"line_number": 100,
"test_id": "B303",
"test_name": "blacklist",
"issue_severity": "HIGH",
"issue_confidence": "HIGH",
"issue_text": "Use of insecure hash"
}
]
}"""
findings = runner._parse_bandit_output(bandit_output, diff, Path("/work"))
assert findings == []
def test_radon_outside_diff(self) -> None:
runner = StaticAnalysisRunner()
diff = ParsedDiff(
files=[
DiffFile(
old_path="test.py",
new_path="test.py",
hunks=[
DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[])
],
)
]
)
radon_output = """{
"/work/test.py": [
{
"type": "function",
"name": "complex_func",
"lineno": 100,
"endline": 120,
"complexity": 25,
"rank": "D"
}
]
}"""
findings = runner._parse_radon_output(radon_output, diff, Path("/work"))
assert findings == []