1345 lines
42 KiB
Python
1345 lines
42 KiB
Python
"""Tests for static analysis module."""
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from arbiter.analysis.diff import DiffFile, DiffHunk, DiffLine, DiffParser, LineType, ParsedDiff
|
|
from arbiter.analysis.static import (
|
|
StaticAnalysisConfig,
|
|
StaticAnalysisRunner,
|
|
StaticFinding,
|
|
run_static_analysis,
|
|
)
|
|
from arbiter.models import Severity
|
|
|
|
|
|
class TestDiffParser:
|
|
def test_parse_empty(self) -> None:
|
|
parser = DiffParser()
|
|
result = parser.parse("")
|
|
|
|
assert result.files == []
|
|
assert result.raw_diff == ""
|
|
|
|
def test_parse_single_file(self) -> None:
|
|
diff = """diff --git a/test.py b/test.py
|
|
index 1234567..abcdefg 100644
|
|
--- a/test.py
|
|
+++ b/test.py
|
|
@@ -1,3 +1,4 @@
|
|
line1
|
|
+added
|
|
line2
|
|
line3
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
assert len(result.files) == 1
|
|
assert result.files[0].old_path == "test.py"
|
|
assert result.files[0].new_path == "test.py"
|
|
assert len(result.files[0].hunks) == 1
|
|
|
|
def test_parse_hunk_header(self) -> None:
|
|
diff = """diff --git a/test.py b/test.py
|
|
index 1234567..abcdefg 100644
|
|
--- a/test.py
|
|
+++ b/test.py
|
|
@@ -10,5 +10,6 @@ def function():
|
|
context
|
|
+added
|
|
more context
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
hunk = result.files[0].hunks[0]
|
|
assert hunk.old_start == 10
|
|
assert hunk.old_count == 5
|
|
assert hunk.new_start == 10
|
|
assert hunk.new_count == 6
|
|
assert hunk.header == "def function():"
|
|
|
|
def test_parse_line_types(self) -> None:
|
|
diff = """diff --git a/test.py b/test.py
|
|
--- a/test.py
|
|
+++ b/test.py
|
|
@@ -1,4 +1,4 @@
|
|
context
|
|
-removed
|
|
+added
|
|
more context
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
lines = result.files[0].hunks[0].lines
|
|
assert lines[0].line_type == LineType.CONTEXT
|
|
assert lines[1].line_type == LineType.REMOVED
|
|
assert lines[2].line_type == LineType.ADDED
|
|
assert lines[3].line_type == LineType.CONTEXT
|
|
|
|
def test_parse_line_numbers(self) -> None:
|
|
diff = """diff --git a/test.py b/test.py
|
|
--- a/test.py
|
|
+++ b/test.py
|
|
@@ -5,4 +5,5 @@
|
|
context
|
|
-removed
|
|
+added1
|
|
+added2
|
|
final
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
lines = result.files[0].hunks[0].lines
|
|
# Context line at 5
|
|
assert lines[0].old_line == 5
|
|
assert lines[0].new_line == 5
|
|
# Removed line at old 6
|
|
assert lines[1].old_line == 6
|
|
assert lines[1].new_line is None
|
|
# Added lines at new 6, 7
|
|
assert lines[2].old_line is None
|
|
assert lines[2].new_line == 6
|
|
assert lines[3].old_line is None
|
|
assert lines[3].new_line == 7
|
|
|
|
def test_parse_multi_file(self) -> None:
|
|
diff = """diff --git a/a.py b/a.py
|
|
--- a/a.py
|
|
+++ b/a.py
|
|
@@ -1,2 +1,3 @@
|
|
line1
|
|
+new
|
|
line2
|
|
diff --git a/b.py b/b.py
|
|
--- a/b.py
|
|
+++ b/b.py
|
|
@@ -1 +1,2 @@
|
|
only
|
|
+added
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
assert len(result.files) == 2
|
|
assert result.files[0].path == "a.py"
|
|
assert result.files[1].path == "b.py"
|
|
|
|
def test_parse_new_file(self) -> None:
|
|
diff = """diff --git a/new.py b/new.py
|
|
new file mode 100644
|
|
--- /dev/null
|
|
+++ b/new.py
|
|
@@ -0,0 +1,3 @@
|
|
+line1
|
|
+line2
|
|
+line3
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
assert result.files[0].is_new is True
|
|
assert result.files[0].path == "new.py"
|
|
|
|
def test_parse_deleted_file(self) -> None:
|
|
diff = """diff --git a/old.py b/old.py
|
|
deleted file mode 100644
|
|
--- a/old.py
|
|
+++ /dev/null
|
|
@@ -1,2 +0,0 @@
|
|
-line1
|
|
-line2
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
assert result.files[0].is_deleted is True
|
|
assert result.files[0].path == "old.py"
|
|
|
|
|
|
class TestDiffFile:
|
|
def test_get_added_line_numbers(self) -> None:
|
|
diff_file = DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(
|
|
old_start=1,
|
|
old_count=2,
|
|
new_start=1,
|
|
new_count=4,
|
|
lines=[
|
|
DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1),
|
|
DiffLine(content="b", line_type=LineType.ADDED, new_line=2),
|
|
DiffLine(content="c", line_type=LineType.ADDED, new_line=3),
|
|
DiffLine(content="d", line_type=LineType.CONTEXT, old_line=2, new_line=4),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
|
|
assert diff_file.get_added_line_numbers() == [2, 3]
|
|
|
|
def test_get_changed_line_range(self) -> None:
|
|
diff_file = DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(
|
|
old_start=10,
|
|
old_count=3,
|
|
new_start=10,
|
|
new_count=5,
|
|
lines=[
|
|
DiffLine(content="a", line_type=LineType.ADDED, new_line=10),
|
|
DiffLine(content="b", line_type=LineType.ADDED, new_line=11),
|
|
DiffLine(content="c", line_type=LineType.ADDED, new_line=14),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
|
|
assert diff_file.get_changed_line_range() == (10, 14)
|
|
|
|
def test_line_in_diff(self) -> None:
|
|
diff_file = DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[DiffHunk(old_start=10, old_count=5, new_start=10, new_count=6, lines=[])],
|
|
)
|
|
|
|
assert diff_file.line_in_diff(10) is True
|
|
assert diff_file.line_in_diff(15) is True
|
|
assert diff_file.line_in_diff(16) is False
|
|
assert diff_file.line_in_diff(9) is False
|
|
|
|
|
|
class TestParsedDiff:
|
|
def test_get_file(self) -> None:
|
|
parsed = ParsedDiff(
|
|
files=[
|
|
DiffFile(old_path="a.py", new_path="a.py"),
|
|
DiffFile(old_path="b.py", new_path="b.py"),
|
|
]
|
|
)
|
|
|
|
assert parsed.get_file("a.py") is not None
|
|
assert parsed.get_file("c.py") is None
|
|
|
|
def test_get_changed_files(self) -> None:
|
|
parsed = ParsedDiff(
|
|
files=[
|
|
DiffFile(old_path="a.py", new_path="a.py"),
|
|
DiffFile(old_path="b.py", new_path="b.py"),
|
|
]
|
|
)
|
|
|
|
assert parsed.get_changed_files() == ["a.py", "b.py"]
|
|
|
|
|
|
class TestStaticFinding:
|
|
def test_create_finding(self) -> None:
|
|
finding = StaticFinding(
|
|
tool="ruff",
|
|
file="test.py",
|
|
line=10,
|
|
code="E501",
|
|
message="Line too long",
|
|
severity=Severity.LOW,
|
|
)
|
|
|
|
assert finding.tool == "ruff"
|
|
assert finding.severity == Severity.LOW
|
|
|
|
|
|
class TestStaticAnalysisRunner:
|
|
def test_ruff_severity_mapping(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
|
|
assert runner._ruff_code_to_severity("S101") == Severity.HIGH
|
|
assert runner._ruff_code_to_severity("E501") == Severity.MEDIUM
|
|
assert runner._ruff_code_to_severity("W503") == Severity.LOW
|
|
assert runner._ruff_code_to_severity("D100") == Severity.INFO
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_without_work_dir(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
result = await runner.run(diff, work_dir=None)
|
|
|
|
assert result.findings == []
|
|
assert result.tools_run == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_no_python_files(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[DiffFile(old_path="test.txt", new_path="test.txt")])
|
|
|
|
result = await runner.run(diff, work_dir=Path("/tmp"))
|
|
|
|
assert result.findings == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parse_ruff_output(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
ruff_output = """[
|
|
{
|
|
"filename": "/work/test.py",
|
|
"location": {"row": 5, "column": 1},
|
|
"end_location": {"row": 5, "column": 80},
|
|
"code": "E501",
|
|
"message": "Line too long (100 > 79)",
|
|
"fix": null,
|
|
"url": "https://docs.astral.sh/ruff/rules/E501"
|
|
}
|
|
]"""
|
|
|
|
findings = runner._parse_ruff_output(ruff_output, diff, Path("/work"))
|
|
|
|
assert len(findings) == 1
|
|
assert findings[0].code == "E501"
|
|
assert findings[0].line == 5
|
|
assert findings[0].severity == Severity.MEDIUM
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parse_mypy_output(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
mypy_output = """/work/test.py:10:5: error: Incompatible return value type
|
|
/work/test.py:15:1: note: See documentation"""
|
|
|
|
findings = runner._parse_mypy_output(mypy_output, diff, Path("/work"))
|
|
|
|
# Both lines are in the diff range (1-20), so both are included
|
|
assert len(findings) == 2
|
|
assert findings[0].line == 10
|
|
assert findings[0].severity == Severity.MEDIUM
|
|
assert findings[1].line == 15
|
|
assert findings[1].severity == Severity.INFO # note maps to INFO
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parse_bandit_output(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
bandit_output = """{
|
|
"results": [
|
|
{
|
|
"filename": "/work/test.py",
|
|
"line_number": 5,
|
|
"test_id": "B303",
|
|
"test_name": "blacklist",
|
|
"issue_severity": "HIGH",
|
|
"issue_confidence": "HIGH",
|
|
"issue_text": "Use of insecure MD5 hash function"
|
|
}
|
|
]
|
|
}"""
|
|
|
|
findings = runner._parse_bandit_output(bandit_output, diff, Path("/work"))
|
|
|
|
assert len(findings) == 1
|
|
assert findings[0].code == "B303"
|
|
assert findings[0].severity == Severity.HIGH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_parse_radon_output(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
radon_output = """{
|
|
"/work/test.py": [
|
|
{
|
|
"type": "function",
|
|
"name": "complex_func",
|
|
"lineno": 10,
|
|
"endline": 30,
|
|
"complexity": 25,
|
|
"rank": "D"
|
|
}
|
|
]
|
|
}"""
|
|
|
|
findings = runner._parse_radon_output(radon_output, diff, Path("/work"))
|
|
|
|
assert len(findings) == 1
|
|
assert findings[0].code == "CCD"
|
|
assert findings[0].severity == Severity.HIGH
|
|
assert "complexity" in findings[0].message.lower()
|
|
|
|
def test_convert_to_finding(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
static_finding = StaticFinding(
|
|
tool="ruff",
|
|
file="test.py",
|
|
line=10,
|
|
code="E501",
|
|
message="Line too long",
|
|
severity=Severity.LOW,
|
|
)
|
|
|
|
finding = runner.convert_to_finding(static_finding)
|
|
|
|
assert finding.file == "test.py"
|
|
assert finding.line_start == 10
|
|
assert finding.severity == Severity.LOW
|
|
assert "ruff" in finding.title.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_disables_tools(self) -> None:
|
|
config = StaticAnalysisConfig(
|
|
ruff_enabled=False,
|
|
mypy_enabled=False,
|
|
bandit_enabled=False,
|
|
radon_enabled=False,
|
|
)
|
|
runner = StaticAnalysisRunner(config)
|
|
diff = ParsedDiff(files=[DiffFile(old_path="test.py", new_path="test.py")])
|
|
|
|
result = await runner.run(diff, work_dir=Path("/tmp"))
|
|
|
|
assert result.tools_run == []
|
|
|
|
def test_ruff_code_to_severity_empty(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
assert runner._ruff_code_to_severity("") == Severity.MEDIUM
|
|
|
|
def test_ruff_code_to_severity_unknown(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
assert runner._ruff_code_to_severity("XYZ999") == Severity.MEDIUM
|
|
|
|
def test_parse_ruff_output_empty(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
findings = runner._parse_ruff_output("", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
findings = runner._parse_ruff_output(" ", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_parse_ruff_output_invalid_json(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
findings = runner._parse_ruff_output("not json", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_ruff_filters_outside_diff(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
# Finding at line 100 is outside the diff
|
|
ruff_output = """[
|
|
{
|
|
"filename": "/work/test.py",
|
|
"location": {"row": 100, "column": 1},
|
|
"end_location": {"row": 100, "column": 80},
|
|
"code": "E501",
|
|
"message": "Line too long"
|
|
}
|
|
]"""
|
|
|
|
findings = runner._parse_ruff_output(ruff_output, diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_parse_mypy_output_empty(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
findings = runner._parse_mypy_output("", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_parse_mypy_output_invalid_format(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
findings = runner._parse_mypy_output("not a valid line", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_parse_mypy_output_warning_severity(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
mypy_output = "/work/test.py:5:1: warning: Deprecated function call"
|
|
findings = runner._parse_mypy_output(mypy_output, diff, Path("/work"))
|
|
|
|
assert len(findings) == 1
|
|
assert findings[0].severity == Severity.LOW
|
|
|
|
def test_parse_bandit_output_empty(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
findings = runner._parse_bandit_output("", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_parse_bandit_output_invalid_json(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
findings = runner._parse_bandit_output("not json", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_parse_bandit_output_low_severity(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
bandit_output = """{
|
|
"results": [
|
|
{
|
|
"filename": "/work/test.py",
|
|
"line_number": 5,
|
|
"test_id": "B105",
|
|
"test_name": "hardcoded_password",
|
|
"issue_severity": "LOW",
|
|
"issue_confidence": "MEDIUM",
|
|
"issue_text": "Possible hardcoded password"
|
|
}
|
|
]
|
|
}"""
|
|
|
|
findings = runner._parse_bandit_output(bandit_output, diff, Path("/work"))
|
|
assert len(findings) == 1
|
|
assert findings[0].severity == Severity.LOW
|
|
|
|
def test_parse_radon_output_empty(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
findings = runner._parse_radon_output("", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_parse_radon_output_invalid_json(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(files=[])
|
|
|
|
findings = runner._parse_radon_output("not json", diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_radon_low_complexity_skip(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
radon_output = """{
|
|
"/work/test.py": [
|
|
{
|
|
"type": "function",
|
|
"name": "simple_func",
|
|
"lineno": 10,
|
|
"endline": 15,
|
|
"complexity": 3,
|
|
"rank": "A"
|
|
},
|
|
{
|
|
"type": "function",
|
|
"name": "ok_func",
|
|
"lineno": 20,
|
|
"endline": 30,
|
|
"complexity": 8,
|
|
"rank": "B"
|
|
}
|
|
]
|
|
}"""
|
|
|
|
findings = runner._parse_radon_output(radon_output, diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_parse_radon_output_various_ranks(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=100, new_start=1, new_count=100, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
radon_output = """{
|
|
"/work/test.py": [
|
|
{
|
|
"type": "function",
|
|
"name": "moderate_func",
|
|
"lineno": 10,
|
|
"endline": 30,
|
|
"complexity": 15,
|
|
"rank": "C"
|
|
},
|
|
{
|
|
"type": "function",
|
|
"name": "very_complex_func",
|
|
"lineno": 50,
|
|
"endline": 80,
|
|
"complexity": 45,
|
|
"rank": "F"
|
|
}
|
|
]
|
|
}"""
|
|
|
|
findings = runner._parse_radon_output(radon_output, diff, Path("/work"))
|
|
assert len(findings) == 2
|
|
assert findings[0].severity == Severity.MEDIUM # C rank
|
|
assert findings[1].severity == Severity.CRITICAL # F rank
|
|
|
|
def test_convert_to_finding_with_url(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
static_finding = StaticFinding(
|
|
tool="ruff",
|
|
file="test.py",
|
|
line=10,
|
|
code="E501",
|
|
message="Line too long",
|
|
severity=Severity.LOW,
|
|
extra={"url": "https://docs.astral.sh/ruff/rules/E501"},
|
|
)
|
|
|
|
finding = runner.convert_to_finding(static_finding)
|
|
assert "https://docs.astral.sh/ruff/rules/E501" in finding.references
|
|
|
|
def test_convert_to_finding_without_url(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
static_finding = StaticFinding(
|
|
tool="mypy",
|
|
file="test.py",
|
|
line=10,
|
|
code="mypy-error",
|
|
message="Type error",
|
|
severity=Severity.MEDIUM,
|
|
)
|
|
|
|
finding = runner.convert_to_finding(static_finding)
|
|
assert finding.references == []
|
|
|
|
def test_finding_with_fix(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
static_finding = StaticFinding(
|
|
tool="ruff",
|
|
file="test.py",
|
|
line=10,
|
|
code="E501",
|
|
message="Line too long",
|
|
severity=Severity.LOW,
|
|
fix_available=True,
|
|
)
|
|
|
|
finding = runner.convert_to_finding(static_finding)
|
|
assert finding.suggestion is not None
|
|
|
|
|
|
class TestDiffParserAdvanced:
|
|
def test_parse_binary_file(self) -> None:
|
|
diff = """diff --git a/image.png b/image.png
|
|
new file mode 100644
|
|
Binary files /dev/null and b/image.png differ
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
assert len(result.files) == 1
|
|
assert result.files[0].is_binary is True
|
|
assert result.files[0].is_new is True
|
|
|
|
def test_parse_hunk_no_newline(self) -> None:
|
|
diff = """diff --git a/test.py b/test.py
|
|
--- a/test.py
|
|
+++ b/test.py
|
|
@@ -1,2 +1,2 @@
|
|
line1
|
|
-line2
|
|
+line2 modified
|
|
\\ No newline at end of file
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
assert len(result.files) == 1
|
|
# The \\ line should be skipped
|
|
assert len(result.files[0].hunks[0].lines) == 3
|
|
|
|
def test_parse_hunk_with_function_header(self) -> None:
|
|
diff = """diff --git a/test.py b/test.py
|
|
--- a/test.py
|
|
+++ b/test.py
|
|
@@ -10,5 +10,6 @@ class MyClass:
|
|
context
|
|
+added
|
|
"""
|
|
parser = DiffParser()
|
|
result = parser.parse(diff)
|
|
|
|
assert result.files[0].hunks[0].header == "class MyClass:"
|
|
|
|
def test_diff_hunk_get_added_lines(self) -> None:
|
|
hunk = DiffHunk(
|
|
old_start=1,
|
|
old_count=3,
|
|
new_start=1,
|
|
new_count=4,
|
|
lines=[
|
|
DiffLine(content="ctx", line_type=LineType.CONTEXT, old_line=1, new_line=1),
|
|
DiffLine(content="removed", line_type=LineType.REMOVED, old_line=2),
|
|
DiffLine(content="added", line_type=LineType.ADDED, new_line=2),
|
|
DiffLine(content="ctx2", line_type=LineType.CONTEXT, old_line=3, new_line=3),
|
|
],
|
|
)
|
|
|
|
added = hunk.get_added_lines()
|
|
assert len(added) == 1
|
|
assert added[0].content == "added"
|
|
|
|
def test_diff_hunk_get_removed_lines(self) -> None:
|
|
hunk = DiffHunk(
|
|
old_start=1,
|
|
old_count=3,
|
|
new_start=1,
|
|
new_count=2,
|
|
lines=[
|
|
DiffLine(content="ctx", line_type=LineType.CONTEXT, old_line=1, new_line=1),
|
|
DiffLine(content="removed", line_type=LineType.REMOVED, old_line=2),
|
|
DiffLine(content="ctx2", line_type=LineType.CONTEXT, old_line=3, new_line=2),
|
|
],
|
|
)
|
|
|
|
removed = hunk.get_removed_lines()
|
|
assert len(removed) == 1
|
|
assert removed[0].content == "removed"
|
|
|
|
def test_diff_file_map_new_to_old(self) -> None:
|
|
diff_file = DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(
|
|
old_start=1,
|
|
old_count=2,
|
|
new_start=1,
|
|
new_count=2,
|
|
lines=[
|
|
DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1),
|
|
DiffLine(content="b", line_type=LineType.CONTEXT, old_line=2, new_line=2),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
|
|
assert diff_file.map_new_to_old(1) == 1
|
|
assert diff_file.map_new_to_old(2) == 2
|
|
assert diff_file.map_new_to_old(100) is None
|
|
|
|
def test_diff_file_map_new_to_old_added_line(self) -> None:
|
|
diff_file = DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(
|
|
old_start=1,
|
|
old_count=1,
|
|
new_start=1,
|
|
new_count=2,
|
|
lines=[
|
|
DiffLine(content="a", line_type=LineType.CONTEXT, old_line=1, new_line=1),
|
|
DiffLine(content="b", line_type=LineType.ADDED, new_line=2),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
|
|
# Added line has no old_line
|
|
assert diff_file.map_new_to_old(2) is None
|
|
|
|
def test_parsed_diff_get_added_files(self) -> None:
|
|
parsed = ParsedDiff(
|
|
files=[
|
|
DiffFile(old_path="a.py", new_path="a.py", is_new=True),
|
|
DiffFile(old_path="b.py", new_path="b.py", is_new=False),
|
|
]
|
|
)
|
|
|
|
assert parsed.get_added_files() == ["a.py"]
|
|
|
|
def test_parsed_diff_get_deleted_files(self) -> None:
|
|
parsed = ParsedDiff(
|
|
files=[
|
|
DiffFile(old_path="a.py", new_path="a.py", is_deleted=True),
|
|
DiffFile(old_path="b.py", new_path="b.py", is_deleted=False),
|
|
]
|
|
)
|
|
|
|
assert parsed.get_deleted_files() == ["a.py"]
|
|
|
|
def test_changed_range_empty(self) -> None:
|
|
diff_file = DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(
|
|
old_start=1,
|
|
old_count=2,
|
|
new_start=1,
|
|
new_count=1,
|
|
lines=[
|
|
DiffLine(content="a", line_type=LineType.REMOVED, old_line=1),
|
|
DiffLine(content="b", line_type=LineType.CONTEXT, old_line=2, new_line=1),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
|
|
assert diff_file.get_changed_line_range() is None
|
|
|
|
def test_diff_file_path_deleted(self) -> None:
|
|
diff_file = DiffFile(
|
|
old_path="old.py",
|
|
new_path="new.py",
|
|
is_deleted=True,
|
|
)
|
|
|
|
assert diff_file.path == "old.py"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_static_analysis_convenience() -> None:
|
|
diff = ParsedDiff(files=[])
|
|
|
|
result = await run_static_analysis(diff, work_dir=None)
|
|
|
|
assert result.findings == []
|
|
assert result.tools_run == []
|
|
|
|
|
|
class TestStaticAnalysisRunMethods:
|
|
@pytest.mark.asyncio
|
|
async def test_run_ruff_file_not_found(self, tmp_path: Path) -> None:
|
|
from unittest.mock import patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
# Create the test file
|
|
(tmp_path / "test.py").write_text("print('hello')")
|
|
|
|
with (
|
|
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
|
|
pytest.raises(RuntimeError, match="ruff not found"),
|
|
):
|
|
await runner._run_ruff(diff.files, tmp_path, diff)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_ruff_with_config(self, tmp_path: Path) -> None:
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
config = StaticAnalysisConfig(ruff_config="/path/to/ruff.toml")
|
|
runner = StaticAnalysisRunner(config)
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
# Create the test file
|
|
(tmp_path / "test.py").write_text("print('hello')")
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.communicate.return_value = (b"[]", b"")
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec:
|
|
await runner._run_ruff(diff.files, tmp_path, diff)
|
|
|
|
# Verify config was passed
|
|
call_args = mock_exec.call_args
|
|
assert "--config" in call_args[0]
|
|
assert "/path/to/ruff.toml" in call_args[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_ruff_returns_empty_for_no_files(self, tmp_path: Path) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="nonexistent.py",
|
|
new_path="nonexistent.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
result = await runner._run_ruff(diff.files, tmp_path, diff)
|
|
assert result == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_mypy_file_not_found(self, tmp_path: Path) -> None:
|
|
from unittest.mock import patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("print('hello')")
|
|
|
|
with (
|
|
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
|
|
pytest.raises(RuntimeError, match="mypy not found"),
|
|
):
|
|
await runner._run_mypy(diff.files, tmp_path, diff)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_mypy_with_config(self, tmp_path: Path) -> None:
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
config = StaticAnalysisConfig(mypy_config="/path/to/mypy.ini")
|
|
runner = StaticAnalysisRunner(config)
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("print('hello')")
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.communicate.return_value = (b"", b"")
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec:
|
|
await runner._run_mypy(diff.files, tmp_path, diff)
|
|
|
|
call_args = mock_exec.call_args
|
|
assert "--config-file" in call_args[0]
|
|
assert "/path/to/mypy.ini" in call_args[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_bandit_file_not_found(self, tmp_path: Path) -> None:
|
|
from unittest.mock import patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("print('hello')")
|
|
|
|
with (
|
|
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
|
|
pytest.raises(RuntimeError, match="bandit not found"),
|
|
):
|
|
await runner._run_bandit(diff.files, tmp_path, diff)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_radon_file_not_found(self, tmp_path: Path) -> None:
|
|
from unittest.mock import patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("print('hello')")
|
|
|
|
with (
|
|
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
|
|
pytest.raises(RuntimeError, match="radon not found"),
|
|
):
|
|
await runner._run_radon(diff.files, tmp_path, diff)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_with_tool_error(self, tmp_path: Path) -> None:
|
|
from unittest.mock import patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("print('hello')")
|
|
|
|
# Make ruff fail with RuntimeError
|
|
with (
|
|
patch.object(runner, "_run_ruff", side_effect=RuntimeError("ruff not found")),
|
|
patch.object(runner, "_run_mypy", return_value=[]),
|
|
patch.object(runner, "_run_bandit", return_value=[]),
|
|
patch.object(runner, "_run_radon", return_value=[]),
|
|
):
|
|
result = await runner.run(diff, work_dir=tmp_path)
|
|
|
|
assert "ruff" in result.tool_errors
|
|
assert "not found" in result.tool_errors["ruff"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_ruff_success_with_output(self, tmp_path: Path) -> None:
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=10, new_start=1, new_count=10, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("x=1")
|
|
|
|
ruff_output = f"""[
|
|
{{
|
|
"filename": "{tmp_path}/test.py",
|
|
"location": {{"row": 5, "column": 1}},
|
|
"end_location": {{"row": 5, "column": 80}},
|
|
"code": "E501",
|
|
"message": "Line too long"
|
|
}}
|
|
]"""
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.communicate.return_value = (ruff_output.encode(), b"")
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
|
result = await runner._run_ruff(diff.files, tmp_path, diff)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].code == "E501"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_mypy_success_with_output(self, tmp_path: Path) -> None:
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("x: int = 'string'")
|
|
|
|
mypy_output = f"{tmp_path}/test.py:5:1: error: Type mismatch"
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.communicate.return_value = (mypy_output.encode(), b"")
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
|
result = await runner._run_mypy(diff.files, tmp_path, diff)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].severity == Severity.MEDIUM
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_bandit_success_with_output(self, tmp_path: Path) -> None:
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=20, new_start=1, new_count=20, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("import md5")
|
|
|
|
bandit_output = f"""{{
|
|
"results": [
|
|
{{
|
|
"filename": "{tmp_path}/test.py",
|
|
"line_number": 5,
|
|
"test_id": "B303",
|
|
"test_name": "blacklist",
|
|
"issue_severity": "HIGH",
|
|
"issue_confidence": "HIGH",
|
|
"issue_text": "Use of insecure hash"
|
|
}}
|
|
]
|
|
}}"""
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.communicate.return_value = (bandit_output.encode(), b"")
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
|
result = await runner._run_bandit(diff.files, tmp_path, diff)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].severity == Severity.HIGH
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_radon_success_with_output(self, tmp_path: Path) -> None:
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=1, old_count=50, new_start=1, new_count=50, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
(tmp_path / "test.py").write_text("def func(): pass")
|
|
|
|
radon_output = f"""{{
|
|
"{tmp_path}/test.py": [
|
|
{{
|
|
"type": "function",
|
|
"name": "complex_func",
|
|
"lineno": 10,
|
|
"endline": 30,
|
|
"complexity": 25,
|
|
"rank": "D"
|
|
}}
|
|
]
|
|
}}"""
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.communicate.return_value = (radon_output.encode(), b"")
|
|
|
|
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
|
result = await runner._run_radon(diff.files, tmp_path, diff)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].severity == Severity.HIGH
|
|
|
|
def test_mypy_filters_outside_diff(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
# Line 100 is outside the diff range (10-15)
|
|
mypy_output = "/work/test.py:100:1: error: Type mismatch"
|
|
findings = runner._parse_mypy_output(mypy_output, diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_bandit_outside_diff(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
bandit_output = """{
|
|
"results": [
|
|
{
|
|
"filename": "/work/test.py",
|
|
"line_number": 100,
|
|
"test_id": "B303",
|
|
"test_name": "blacklist",
|
|
"issue_severity": "HIGH",
|
|
"issue_confidence": "HIGH",
|
|
"issue_text": "Use of insecure hash"
|
|
}
|
|
]
|
|
}"""
|
|
|
|
findings = runner._parse_bandit_output(bandit_output, diff, Path("/work"))
|
|
assert findings == []
|
|
|
|
def test_radon_outside_diff(self) -> None:
|
|
runner = StaticAnalysisRunner()
|
|
diff = ParsedDiff(
|
|
files=[
|
|
DiffFile(
|
|
old_path="test.py",
|
|
new_path="test.py",
|
|
hunks=[
|
|
DiffHunk(old_start=10, old_count=5, new_start=10, new_count=5, lines=[])
|
|
],
|
|
)
|
|
]
|
|
)
|
|
|
|
radon_output = """{
|
|
"/work/test.py": [
|
|
{
|
|
"type": "function",
|
|
"name": "complex_func",
|
|
"lineno": 100,
|
|
"endline": 120,
|
|
"complexity": 25,
|
|
"rank": "D"
|
|
}
|
|
]
|
|
}"""
|
|
|
|
findings = runner._parse_radon_output(radon_output, diff, Path("/work"))
|
|
assert findings == []
|