diff --git a/src/arbiter/integrations/formatters.py b/src/arbiter/integrations/formatters.py new file mode 100644 index 0000000..7a797b3 --- /dev/null +++ b/src/arbiter/integrations/formatters.py @@ -0,0 +1,249 @@ +"""Formatters for review comments on code hosting platforms.""" + +from collections import defaultdict +from typing import TYPE_CHECKING, ClassVar + +from arbiter.models import Finding, Severity, Verdict + +if TYPE_CHECKING: + from arbiter.deliberation import Conflict, DeliberationResult + +# HTML comment marker for identifying Arbiter review comments +ARBITER_MARKER = "" + + +def has_arbiter_marker(text: str) -> bool: + return ARBITER_MARKER in text + + +class ReviewCommentFormatter: + """Formats DeliberationResult as Markdown for PR comments.""" + + # GitHub comment limit + MAX_COMMENT_LENGTH = 65535 + + # Verdict icons + VERDICT_ICONS: ClassVar[dict[Verdict, str]] = { + Verdict.APPROVE: "\u2705", # green checkmark + Verdict.REQUEST_CHANGES: "\u274c", # red X + Verdict.COMMENT: "\U0001f4ac", # speech bubble + } + + # Severity icons + SEVERITY_ICONS: ClassVar[dict[Severity, str]] = { + Severity.CRITICAL: "\U0001f6a8", # rotating light + Severity.HIGH: "\U0001f534", # red circle + Severity.MEDIUM: "\U0001f7e0", # orange circle + Severity.LOW: "\U0001f7e1", # yellow circle + Severity.INFO: "\U0001f535", # blue circle + } + + def __init__(self, include_cost: bool = False, max_findings: int = 50) -> None: + self.include_cost = include_cost + self.max_findings = max_findings + + def format(self, result: "DeliberationResult") -> str: + """Format a DeliberationResult as Markdown. + + Args: + result: Deliberation result to format. + + Returns: + Markdown-formatted comment body. + """ + sections: list[str] = [] + + # Header with verdict + sections.append(self._format_header(result)) + + # Summary statistics + sections.append(self._format_summary(result)) + + # Findings by severity + if result.findings: + sections.append(self._format_findings(result.findings)) + + # Conflicts section + if result.conflicts: + sections.append(self._format_conflicts(result.conflicts)) + + # Cost footer (optional) + if self.include_cost: + sections.append(self._format_cost(result)) + + # Footer + sections.append(self._format_footer()) + + # Add marker at the end (invisible in rendered markdown) + sections.append(ARBITER_MARKER) + + comment = "\n\n".join(sections) + + # Truncate if needed + if len(comment) > self.MAX_COMMENT_LENGTH: + comment = self._truncate(comment) + + return comment + + def _format_header(self, result: "DeliberationResult") -> str: + """Format the verdict header.""" + icon = self.VERDICT_ICONS.get(result.verdict, "") + verdict_text = result.verdict.value.replace("_", " ").title() + + header = f"## {icon} Arbiter Review: {verdict_text}\n\n" + header += f"**Confidence:** {result.verdict_confidence:.0%}\n\n" + + if result.verdict_reasoning: + header += f"> {result.verdict_reasoning}" + + return header + + def _format_summary(self, result: "DeliberationResult") -> str: + """Format summary statistics.""" + lines: list[str] = ["### Summary"] + + # Count by severity + counts: dict[Severity, int] = defaultdict(int) + for finding in result.findings: + counts[finding.severity] += 1 + + # Format counts table + if result.total_findings > 0: + lines.append("") + lines.append("| Severity | Count |") + lines.append("|----------|-------|") + + for severity in [ + Severity.CRITICAL, + Severity.HIGH, + Severity.MEDIUM, + Severity.LOW, + Severity.INFO, + ]: + if counts[severity] > 0: + icon = self.SEVERITY_ICONS.get(severity, "") + lines.append(f"| {icon} {severity.value.title()} | {counts[severity]} |") + + lines.append("") + lines.append(f"**Total findings:** {result.total_findings}") + else: + lines.append("\nNo issues found.") + + if result.conflicts: + lines.append(f"\n**Conflicts detected:** {len(result.conflicts)}") + + return "\n".join(lines) + + def _format_findings(self, findings: list[Finding]) -> str: + """Format findings grouped by severity.""" + lines: list[str] = ["### Findings"] + + # Group by severity + by_severity: dict[Severity, list[Finding]] = defaultdict(list) + for finding in findings: + by_severity[finding.severity].append(finding) + + # Display in severity order + displayed = 0 + for severity in [ + Severity.CRITICAL, + Severity.HIGH, + Severity.MEDIUM, + Severity.LOW, + Severity.INFO, + ]: + severity_findings = by_severity[severity] + if not severity_findings: + continue + + icon = self.SEVERITY_ICONS.get(severity, "") + lines.append(f"\n#### {icon} {severity.value.title()}") + + for finding in severity_findings: + if displayed >= self.max_findings: + remaining = len(findings) - displayed + lines.append(f"\n*...and {remaining} more findings*") + return "\n".join(lines) + + lines.append(self._format_finding(finding)) + displayed += 1 + + return "\n".join(lines) + + def _format_finding(self, finding: Finding) -> str: + """Format a single finding.""" + lines: list[str] = [] + + # Location and title + location = f"`{finding.file}:{finding.line_start}" + if finding.line_end != finding.line_start: + location += f"-{finding.line_end}" + location += "`" + + lines.append(f"\n**{finding.title}** ({location})") + lines.append( + f"\n_{finding.agent.value.title()} agent_ | Confidence: {finding.confidence:.0%}" + ) + + # Description + lines.append(f"\n{finding.description}") + + # Suggestion (if present) + if finding.suggestion: + lines.append(f"\n> **Suggestion:** {finding.suggestion}") + + # References (if present) + if finding.references: + refs = " | ".join(f"[{i + 1}]({ref})" for i, ref in enumerate(finding.references[:3])) + lines.append(f"\n**References:** {refs}") + + return "\n".join(lines) + + def _format_conflicts(self, conflicts: list["Conflict"]) -> str: + """Format the conflicts section.""" + lines: list[str] = ["### Agent Conflicts"] + lines.append("\nThe following conflicts were detected between agents:") + + for conflict in conflicts[:10]: # Limit to 10 conflicts + lines.append( + f"\n- **{conflict.nature.value.replace('_', ' ').title()}**: {conflict.description}" + ) + if conflict.resolution: + lines.append(f" - *Resolution:* {conflict.resolution}") + + if len(conflicts) > 10: + lines.append(f"\n*...and {len(conflicts) - 10} more conflicts*") + + return "\n".join(lines) + + def _format_cost(self, result: "DeliberationResult") -> str: + lines: list[str] = ["---"] + lines.append("
") + lines.append("Review cost") + lines.append("") + lines.append(f"- Tokens used: {result.tokens_used:,}") + lines.append(f"- Cost: ${result.cost_usd:.4f}") + lines.append("
") + return "\n".join(lines) + + def _format_footer(self) -> str: + return ( + "---\n" + "*Generated by [Arbiter](https://github.com/kschappell/arbiter) " + "- Multi-agent code review*" + ) + + def _truncate(self, comment: str) -> str: + """Truncate comment to fit within GitHub's limit.""" + # Leave room for truncation message + truncation_msg = "\n\n*[Comment truncated due to length limit]*" + max_len = self.MAX_COMMENT_LENGTH - len(truncation_msg) + + # Try to truncate at a paragraph boundary + truncated = comment[:max_len] + last_double_newline = truncated.rfind("\n\n") + + if last_double_newline > max_len // 2: + truncated = truncated[:last_double_newline] + + return truncated + truncation_msg