Add results viewer dashboard page

This commit is contained in:
2025-10-22 13:22:56 +00:00
parent 6802e9ab15
commit 5ced80cd85
2 changed files with 241 additions and 2 deletions

View File

@@ -654,6 +654,203 @@ def test_execution_page() -> None:
st.table(results_data)
def results_viewer_page() -> None:
"""Results viewer page for browsing historical test results."""
st.header("Results Viewer")
st.markdown("Browse and analyze historical test results.")
repository: SQLiteRepository = st.session_state.repository
# Get all test runs
all_runs = repository.get_all_runs()
if not all_runs:
st.info("No test results available. Run a test in the Test Execution tab to generate results.")
return
# Summary statistics
st.subheader("Summary")
col1, col2, col3, col4 = st.columns(4)
status_counts: dict[str, int] = {}
for run in all_runs:
status_counts[run.status.value] = status_counts.get(run.status.value, 0) + 1
with col1:
st.metric("Total Runs", len(all_runs))
with col2:
st.metric("Passed", status_counts.get("passed", 0))
with col3:
st.metric("Failed", status_counts.get("failed", 0))
with col4:
st.metric("Errors", status_counts.get("error", 0))
st.divider()
# Filter controls
st.subheader("Filter Results")
col1, col2 = st.columns(2)
with col1:
# Filter by test name
test_names = list({run.test_name for run in all_runs})
selected_tests = st.multiselect(
"Test Name",
options=["All"] + test_names,
default=["All"],
)
with col2:
# Filter by status
statuses = list({run.status.value for run in all_runs})
selected_statuses = st.multiselect(
"Status",
options=["All"] + statuses,
default=["All"],
)
# Apply filters
filtered_runs = all_runs
if "All" not in selected_tests:
filtered_runs = [r for r in filtered_runs if r.test_name in selected_tests]
if "All" not in selected_statuses:
filtered_runs = [r for r in filtered_runs if r.status.value in selected_statuses]
st.markdown(f"**Showing {len(filtered_runs)} of {len(all_runs)} runs**")
# Test runs table
st.subheader("Test Runs")
if not filtered_runs:
st.info("No test runs match the selected filters.")
return
# Create table data
table_data = []
for run in filtered_runs:
status_icon = {"passed": "🟢", "failed": "🔴", "error": "🟡", "running": "🔵", "pending": "", "skipped": ""}.get(
run.status.value, ""
)
duration = "N/A"
if run.completed_at and run.started_at:
duration = f"{(run.completed_at - run.started_at).total_seconds():.1f}s"
table_data.append({
"Select": False,
"ID": run.id[:8],
"Test": run.test_name,
"Status": f"{status_icon} {run.status.value.upper()}",
"Started": run.started_at.strftime("%Y-%m-%d %H:%M:%S"),
"Duration": duration,
"Operator": run.operator or "N/A",
})
# Display table with selection
import pandas as pd
df = pd.DataFrame(table_data)
# Use data editor for row selection
edited_df = st.data_editor(
df,
use_container_width=True,
hide_index=True,
column_config={
"Select": st.column_config.CheckboxColumn(
"Select",
help="Select a test run to view details",
default=False,
)
},
disabled=["ID", "Test", "Status", "Started", "Duration", "Operator"],
)
# Get selected run
selected_rows = edited_df[edited_df["Select"]]
if not selected_rows.empty:
selected_id_short = selected_rows.iloc[0]["ID"]
# Find full UUID
selected_run = next((r for r in filtered_runs if r.id.startswith(selected_id_short)), None)
if selected_run:
st.divider()
st.subheader(f"Test Run Details: {selected_run.test_name}")
# Run metadata
col1, col2, col3 = st.columns(3)
with col1:
st.markdown(f"**Run ID:** `{selected_run.id}`")
st.markdown(f"**Test:** {selected_run.test_name}")
st.markdown(f"**Status:** {selected_run.status.value.upper()}")
with col2:
st.markdown(f"**Started:** {selected_run.started_at.strftime('%Y-%m-%d %H:%M:%S')}")
if selected_run.completed_at:
st.markdown(f"**Completed:** {selected_run.completed_at.strftime('%Y-%m-%d %H:%M:%S')}")
if selected_run.completed_at and selected_run.started_at:
duration_sec = (selected_run.completed_at - selected_run.started_at).total_seconds()
st.markdown(f"**Duration:** {duration_sec:.1f}s")
with col3:
st.markdown(f"**Operator:** {selected_run.operator or 'N/A'}")
if selected_run.description:
st.markdown(f"**Description:** {selected_run.description}")
# Configuration
if selected_run.config_json:
import json
with st.expander("Test Configuration"):
config = json.loads(selected_run.config_json)
st.json(config)
# Results
from uuid import UUID
run_uuid = UUID(selected_run.id)
results = repository.get_results(run_uuid)
if results:
st.markdown("### Test Results")
results_table = []
for result in results:
pass_status = "✅ PASS" if result.passed else "❌ FAIL" if result.passed is False else "N/A"
results_table.append({
"Parameter": result.parameter,
"Value": f"{result.value:.6f}",
"Unit": result.unit,
"Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A",
"Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A",
"Status": pass_status,
})
st.table(results_table)
# Measurements
try:
measurements = repository.get_measurements_dataframe(run_uuid)
if measurements is not None and not measurements.empty:
st.markdown("### Measurements")
# Show measurement count
st.caption(f"Total measurements: {len(measurements)}")
# Plot measurements by parameter
parameters = measurements["parameter"].unique()
for param in parameters:
param_data = measurements[measurements["parameter"] == param]
if not param_data.empty:
st.markdown(f"#### {param}")
# Create chart
import altair as alt
chart = alt.Chart(param_data).mark_line().encode(
x=alt.X("timestamp:Q", title="Time (s)"),
y=alt.Y("value:Q", title=f"Value ({param_data.iloc[0]['unit']})"),
tooltip=["timestamp", "value", "unit"]
).properties(
height=300
)
st.altair_chart(chart, use_container_width=True)
except Exception as e:
st.caption(f"No time-series measurement data available ({e})")
def main() -> None:
"""Main entry point for the Streamlit dashboard."""
st.set_page_config(
@@ -688,8 +885,7 @@ def main() -> None:
test_execution_page()
with tab3:
st.header("Results Viewer")
st.info("Results viewer coming soon in Task 17.3!")
results_viewer_page()
if __name__ == "__main__":

View File

@@ -70,6 +70,10 @@ class ITestRepository(ABC):
def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
"""Retrieve measurements as pandas DataFrame."""
@abstractmethod
def get_all_runs(self) -> list[TestRun]:
"""Retrieve all test runs, ordered by started_at descending."""
class SQLiteRepository(ITestRepository):
"""SQLite-based repository for test data.
@@ -357,3 +361,42 @@ class SQLiteRepository(ITestRepository):
return None
return pd.read_parquet(parquet_path)
def get_all_runs(self) -> list[TestRun]:
"""Retrieve all test runs, ordered by started_at descending.
Returns:
List of all TestRun objects, newest first.
"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, test_name, started_at, status, config_json,
description, completed_at, operator, notes, created_at
FROM test_runs
ORDER BY started_at DESC
""")
rows = cursor.fetchall()
return [
TestRun(
id=row["id"],
test_name=row["test_name"],
started_at=datetime.fromisoformat(row["started_at"]),
status=TestStatus(row["status"]),
config_json=row["config_json"],
description=row["description"],
completed_at=(
datetime.fromisoformat(row["completed_at"])
if row["completed_at"]
else None
),
operator=row["operator"],
notes=row["notes"],
created_at=datetime.fromisoformat(row["created_at"]),
)
for row in rows
]