From 9cf42112a6945dbbdf600f3fa0e2eeb0ebc5d43c Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Wed, 22 Oct 2025 13:22:56 +0000 Subject: [PATCH] Add results viewer dashboard page --- src/py_dvt_ate/app/dashboard/app.py | 200 +++++++++++++++++++++++++++- src/py_dvt_ate/data/repository.py | 43 ++++++ 2 files changed, 241 insertions(+), 2 deletions(-) diff --git a/src/py_dvt_ate/app/dashboard/app.py b/src/py_dvt_ate/app/dashboard/app.py index 8ed4810..e31b291 100644 --- a/src/py_dvt_ate/app/dashboard/app.py +++ b/src/py_dvt_ate/app/dashboard/app.py @@ -654,6 +654,203 @@ def test_execution_page() -> None: st.table(results_data) +def results_viewer_page() -> None: + """Results viewer page for browsing historical test results.""" + st.header("Results Viewer") + st.markdown("Browse and analyze historical test results.") + + repository: SQLiteRepository = st.session_state.repository + + # Get all test runs + all_runs = repository.get_all_runs() + + if not all_runs: + st.info("No test results available. Run a test in the Test Execution tab to generate results.") + return + + # Summary statistics + st.subheader("Summary") + col1, col2, col3, col4 = st.columns(4) + + status_counts: dict[str, int] = {} + for run in all_runs: + status_counts[run.status.value] = status_counts.get(run.status.value, 0) + 1 + + with col1: + st.metric("Total Runs", len(all_runs)) + with col2: + st.metric("Passed", status_counts.get("passed", 0)) + with col3: + st.metric("Failed", status_counts.get("failed", 0)) + with col4: + st.metric("Errors", status_counts.get("error", 0)) + + st.divider() + + # Filter controls + st.subheader("Filter Results") + col1, col2 = st.columns(2) + + with col1: + # Filter by test name + test_names = list({run.test_name for run in all_runs}) + selected_tests = st.multiselect( + "Test Name", + options=["All"] + test_names, + default=["All"], + ) + + with col2: + # Filter by status + statuses = list({run.status.value for run in all_runs}) + selected_statuses = st.multiselect( + "Status", + options=["All"] + statuses, + default=["All"], + ) + + # Apply filters + filtered_runs = all_runs + if "All" not in selected_tests: + filtered_runs = [r for r in filtered_runs if r.test_name in selected_tests] + if "All" not in selected_statuses: + filtered_runs = [r for r in filtered_runs if r.status.value in selected_statuses] + + st.markdown(f"**Showing {len(filtered_runs)} of {len(all_runs)} runs**") + + # Test runs table + st.subheader("Test Runs") + + if not filtered_runs: + st.info("No test runs match the selected filters.") + return + + # Create table data + table_data = [] + for run in filtered_runs: + status_icon = {"passed": "🟢", "failed": "🔴", "error": "🟡", "running": "🔵", "pending": "⚪", "skipped": "⚫"}.get( + run.status.value, "⚪" + ) + + duration = "N/A" + if run.completed_at and run.started_at: + duration = f"{(run.completed_at - run.started_at).total_seconds():.1f}s" + + table_data.append({ + "Select": False, + "ID": run.id[:8], + "Test": run.test_name, + "Status": f"{status_icon} {run.status.value.upper()}", + "Started": run.started_at.strftime("%Y-%m-%d %H:%M:%S"), + "Duration": duration, + "Operator": run.operator or "N/A", + }) + + # Display table with selection + import pandas as pd + df = pd.DataFrame(table_data) + + # Use data editor for row selection + edited_df = st.data_editor( + df, + use_container_width=True, + hide_index=True, + column_config={ + "Select": st.column_config.CheckboxColumn( + "Select", + help="Select a test run to view details", + default=False, + ) + }, + disabled=["ID", "Test", "Status", "Started", "Duration", "Operator"], + ) + + # Get selected run + selected_rows = edited_df[edited_df["Select"]] + if not selected_rows.empty: + selected_id_short = selected_rows.iloc[0]["ID"] + # Find full UUID + selected_run = next((r for r in filtered_runs if r.id.startswith(selected_id_short)), None) + + if selected_run: + st.divider() + st.subheader(f"Test Run Details: {selected_run.test_name}") + + # Run metadata + col1, col2, col3 = st.columns(3) + with col1: + st.markdown(f"**Run ID:** `{selected_run.id}`") + st.markdown(f"**Test:** {selected_run.test_name}") + st.markdown(f"**Status:** {selected_run.status.value.upper()}") + with col2: + st.markdown(f"**Started:** {selected_run.started_at.strftime('%Y-%m-%d %H:%M:%S')}") + if selected_run.completed_at: + st.markdown(f"**Completed:** {selected_run.completed_at.strftime('%Y-%m-%d %H:%M:%S')}") + if selected_run.completed_at and selected_run.started_at: + duration_sec = (selected_run.completed_at - selected_run.started_at).total_seconds() + st.markdown(f"**Duration:** {duration_sec:.1f}s") + with col3: + st.markdown(f"**Operator:** {selected_run.operator or 'N/A'}") + if selected_run.description: + st.markdown(f"**Description:** {selected_run.description}") + + # Configuration + if selected_run.config_json: + import json + with st.expander("Test Configuration"): + config = json.loads(selected_run.config_json) + st.json(config) + + # Results + from uuid import UUID + run_uuid = UUID(selected_run.id) + results = repository.get_results(run_uuid) + if results: + st.markdown("### Test Results") + results_table = [] + for result in results: + pass_status = "✅ PASS" if result.passed else "❌ FAIL" if result.passed is False else "N/A" + results_table.append({ + "Parameter": result.parameter, + "Value": f"{result.value:.6f}", + "Unit": result.unit, + "Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A", + "Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A", + "Status": pass_status, + }) + st.table(results_table) + + # Measurements + try: + measurements = repository.get_measurements_dataframe(run_uuid) + if measurements is not None and not measurements.empty: + st.markdown("### Measurements") + + # Show measurement count + st.caption(f"Total measurements: {len(measurements)}") + + # Plot measurements by parameter + parameters = measurements["parameter"].unique() + + for param in parameters: + param_data = measurements[measurements["parameter"] == param] + if not param_data.empty: + st.markdown(f"#### {param}") + + # Create chart + import altair as alt + chart = alt.Chart(param_data).mark_line().encode( + x=alt.X("timestamp:Q", title="Time (s)"), + y=alt.Y("value:Q", title=f"Value ({param_data.iloc[0]['unit']})"), + tooltip=["timestamp", "value", "unit"] + ).properties( + height=300 + ) + st.altair_chart(chart, use_container_width=True) + except Exception as e: + st.caption(f"No time-series measurement data available ({e})") + + def main() -> None: """Main entry point for the Streamlit dashboard.""" st.set_page_config( @@ -688,8 +885,7 @@ def main() -> None: test_execution_page() with tab3: - st.header("Results Viewer") - st.info("Results viewer coming soon in Task 17.3!") + results_viewer_page() if __name__ == "__main__": diff --git a/src/py_dvt_ate/data/repository.py b/src/py_dvt_ate/data/repository.py index 874e6bc..d5012ae 100644 --- a/src/py_dvt_ate/data/repository.py +++ b/src/py_dvt_ate/data/repository.py @@ -70,6 +70,10 @@ class ITestRepository(ABC): def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None: """Retrieve measurements as pandas DataFrame.""" + @abstractmethod + def get_all_runs(self) -> list[TestRun]: + """Retrieve all test runs, ordered by started_at descending.""" + class SQLiteRepository(ITestRepository): """SQLite-based repository for test data. @@ -357,3 +361,42 @@ class SQLiteRepository(ITestRepository): return None return pd.read_parquet(parquet_path) + + def get_all_runs(self) -> list[TestRun]: + """Retrieve all test runs, ordered by started_at descending. + + Returns: + List of all TestRun objects, newest first. + """ + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute(""" + SELECT id, test_name, started_at, status, config_json, + description, completed_at, operator, notes, created_at + FROM test_runs + ORDER BY started_at DESC + """) + + rows = cursor.fetchall() + + return [ + TestRun( + id=row["id"], + test_name=row["test_name"], + started_at=datetime.fromisoformat(row["started_at"]), + status=TestStatus(row["status"]), + config_json=row["config_json"], + description=row["description"], + completed_at=( + datetime.fromisoformat(row["completed_at"]) + if row["completed_at"] + else None + ), + operator=row["operator"], + notes=row["notes"], + created_at=datetime.fromisoformat(row["created_at"]), + ) + for row in rows + ]