add review comment formatter
This commit is contained in:
249
src/arbiter/integrations/formatters.py
Normal file
249
src/arbiter/integrations/formatters.py
Normal file
@@ -0,0 +1,249 @@
|
||||
"""Formatters for review comments on code hosting platforms."""
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import TYPE_CHECKING, ClassVar
|
||||
|
||||
from arbiter.models import Finding, Severity, Verdict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from arbiter.deliberation import Conflict, DeliberationResult
|
||||
|
||||
# HTML comment marker for identifying Arbiter review comments
|
||||
ARBITER_MARKER = "<!-- arbiter-review -->"
|
||||
|
||||
|
||||
def has_arbiter_marker(text: str) -> bool:
|
||||
return ARBITER_MARKER in text
|
||||
|
||||
|
||||
class ReviewCommentFormatter:
|
||||
"""Formats DeliberationResult as Markdown for PR comments."""
|
||||
|
||||
# GitHub comment limit
|
||||
MAX_COMMENT_LENGTH = 65535
|
||||
|
||||
# Verdict icons
|
||||
VERDICT_ICONS: ClassVar[dict[Verdict, str]] = {
|
||||
Verdict.APPROVE: "\u2705", # green checkmark
|
||||
Verdict.REQUEST_CHANGES: "\u274c", # red X
|
||||
Verdict.COMMENT: "\U0001f4ac", # speech bubble
|
||||
}
|
||||
|
||||
# Severity icons
|
||||
SEVERITY_ICONS: ClassVar[dict[Severity, str]] = {
|
||||
Severity.CRITICAL: "\U0001f6a8", # rotating light
|
||||
Severity.HIGH: "\U0001f534", # red circle
|
||||
Severity.MEDIUM: "\U0001f7e0", # orange circle
|
||||
Severity.LOW: "\U0001f7e1", # yellow circle
|
||||
Severity.INFO: "\U0001f535", # blue circle
|
||||
}
|
||||
|
||||
def __init__(self, include_cost: bool = False, max_findings: int = 50) -> None:
|
||||
self.include_cost = include_cost
|
||||
self.max_findings = max_findings
|
||||
|
||||
def format(self, result: "DeliberationResult") -> str:
|
||||
"""Format a DeliberationResult as Markdown.
|
||||
|
||||
Args:
|
||||
result: Deliberation result to format.
|
||||
|
||||
Returns:
|
||||
Markdown-formatted comment body.
|
||||
"""
|
||||
sections: list[str] = []
|
||||
|
||||
# Header with verdict
|
||||
sections.append(self._format_header(result))
|
||||
|
||||
# Summary statistics
|
||||
sections.append(self._format_summary(result))
|
||||
|
||||
# Findings by severity
|
||||
if result.findings:
|
||||
sections.append(self._format_findings(result.findings))
|
||||
|
||||
# Conflicts section
|
||||
if result.conflicts:
|
||||
sections.append(self._format_conflicts(result.conflicts))
|
||||
|
||||
# Cost footer (optional)
|
||||
if self.include_cost:
|
||||
sections.append(self._format_cost(result))
|
||||
|
||||
# Footer
|
||||
sections.append(self._format_footer())
|
||||
|
||||
# Add marker at the end (invisible in rendered markdown)
|
||||
sections.append(ARBITER_MARKER)
|
||||
|
||||
comment = "\n\n".join(sections)
|
||||
|
||||
# Truncate if needed
|
||||
if len(comment) > self.MAX_COMMENT_LENGTH:
|
||||
comment = self._truncate(comment)
|
||||
|
||||
return comment
|
||||
|
||||
def _format_header(self, result: "DeliberationResult") -> str:
|
||||
"""Format the verdict header."""
|
||||
icon = self.VERDICT_ICONS.get(result.verdict, "")
|
||||
verdict_text = result.verdict.value.replace("_", " ").title()
|
||||
|
||||
header = f"## {icon} Arbiter Review: {verdict_text}\n\n"
|
||||
header += f"**Confidence:** {result.verdict_confidence:.0%}\n\n"
|
||||
|
||||
if result.verdict_reasoning:
|
||||
header += f"> {result.verdict_reasoning}"
|
||||
|
||||
return header
|
||||
|
||||
def _format_summary(self, result: "DeliberationResult") -> str:
|
||||
"""Format summary statistics."""
|
||||
lines: list[str] = ["### Summary"]
|
||||
|
||||
# Count by severity
|
||||
counts: dict[Severity, int] = defaultdict(int)
|
||||
for finding in result.findings:
|
||||
counts[finding.severity] += 1
|
||||
|
||||
# Format counts table
|
||||
if result.total_findings > 0:
|
||||
lines.append("")
|
||||
lines.append("| Severity | Count |")
|
||||
lines.append("|----------|-------|")
|
||||
|
||||
for severity in [
|
||||
Severity.CRITICAL,
|
||||
Severity.HIGH,
|
||||
Severity.MEDIUM,
|
||||
Severity.LOW,
|
||||
Severity.INFO,
|
||||
]:
|
||||
if counts[severity] > 0:
|
||||
icon = self.SEVERITY_ICONS.get(severity, "")
|
||||
lines.append(f"| {icon} {severity.value.title()} | {counts[severity]} |")
|
||||
|
||||
lines.append("")
|
||||
lines.append(f"**Total findings:** {result.total_findings}")
|
||||
else:
|
||||
lines.append("\nNo issues found.")
|
||||
|
||||
if result.conflicts:
|
||||
lines.append(f"\n**Conflicts detected:** {len(result.conflicts)}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_findings(self, findings: list[Finding]) -> str:
|
||||
"""Format findings grouped by severity."""
|
||||
lines: list[str] = ["### Findings"]
|
||||
|
||||
# Group by severity
|
||||
by_severity: dict[Severity, list[Finding]] = defaultdict(list)
|
||||
for finding in findings:
|
||||
by_severity[finding.severity].append(finding)
|
||||
|
||||
# Display in severity order
|
||||
displayed = 0
|
||||
for severity in [
|
||||
Severity.CRITICAL,
|
||||
Severity.HIGH,
|
||||
Severity.MEDIUM,
|
||||
Severity.LOW,
|
||||
Severity.INFO,
|
||||
]:
|
||||
severity_findings = by_severity[severity]
|
||||
if not severity_findings:
|
||||
continue
|
||||
|
||||
icon = self.SEVERITY_ICONS.get(severity, "")
|
||||
lines.append(f"\n#### {icon} {severity.value.title()}")
|
||||
|
||||
for finding in severity_findings:
|
||||
if displayed >= self.max_findings:
|
||||
remaining = len(findings) - displayed
|
||||
lines.append(f"\n*...and {remaining} more findings*")
|
||||
return "\n".join(lines)
|
||||
|
||||
lines.append(self._format_finding(finding))
|
||||
displayed += 1
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_finding(self, finding: Finding) -> str:
|
||||
"""Format a single finding."""
|
||||
lines: list[str] = []
|
||||
|
||||
# Location and title
|
||||
location = f"`{finding.file}:{finding.line_start}"
|
||||
if finding.line_end != finding.line_start:
|
||||
location += f"-{finding.line_end}"
|
||||
location += "`"
|
||||
|
||||
lines.append(f"\n**{finding.title}** ({location})")
|
||||
lines.append(
|
||||
f"\n_{finding.agent.value.title()} agent_ | Confidence: {finding.confidence:.0%}"
|
||||
)
|
||||
|
||||
# Description
|
||||
lines.append(f"\n{finding.description}")
|
||||
|
||||
# Suggestion (if present)
|
||||
if finding.suggestion:
|
||||
lines.append(f"\n> **Suggestion:** {finding.suggestion}")
|
||||
|
||||
# References (if present)
|
||||
if finding.references:
|
||||
refs = " | ".join(f"[{i + 1}]({ref})" for i, ref in enumerate(finding.references[:3]))
|
||||
lines.append(f"\n**References:** {refs}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_conflicts(self, conflicts: list["Conflict"]) -> str:
|
||||
"""Format the conflicts section."""
|
||||
lines: list[str] = ["### Agent Conflicts"]
|
||||
lines.append("\nThe following conflicts were detected between agents:")
|
||||
|
||||
for conflict in conflicts[:10]: # Limit to 10 conflicts
|
||||
lines.append(
|
||||
f"\n- **{conflict.nature.value.replace('_', ' ').title()}**: {conflict.description}"
|
||||
)
|
||||
if conflict.resolution:
|
||||
lines.append(f" - *Resolution:* {conflict.resolution}")
|
||||
|
||||
if len(conflicts) > 10:
|
||||
lines.append(f"\n*...and {len(conflicts) - 10} more conflicts*")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_cost(self, result: "DeliberationResult") -> str:
|
||||
lines: list[str] = ["---"]
|
||||
lines.append("<details>")
|
||||
lines.append("<summary>Review cost</summary>")
|
||||
lines.append("")
|
||||
lines.append(f"- Tokens used: {result.tokens_used:,}")
|
||||
lines.append(f"- Cost: ${result.cost_usd:.4f}")
|
||||
lines.append("</details>")
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_footer(self) -> str:
|
||||
return (
|
||||
"---\n"
|
||||
"*Generated by [Arbiter](https://github.com/kschappell/arbiter) "
|
||||
"- Multi-agent code review*"
|
||||
)
|
||||
|
||||
def _truncate(self, comment: str) -> str:
|
||||
"""Truncate comment to fit within GitHub's limit."""
|
||||
# Leave room for truncation message
|
||||
truncation_msg = "\n\n*[Comment truncated due to length limit]*"
|
||||
max_len = self.MAX_COMMENT_LENGTH - len(truncation_msg)
|
||||
|
||||
# Try to truncate at a paragraph boundary
|
||||
truncated = comment[:max_len]
|
||||
last_double_newline = truncated.rfind("\n\n")
|
||||
|
||||
if last_double_newline > max_len // 2:
|
||||
truncated = truncated[:last_double_newline]
|
||||
|
||||
return truncated + truncation_msg
|
||||
Reference in New Issue
Block a user