"""Streamlit dashboard application for physics simulation visualisation. This module provides an interactive dashboard for visualising the physics simulation through the Hardware Abstraction Layer (HAL), demonstrating thermal-electrical coupling in real-time using instrument interfaces. """ import asyncio import atexit import threading import time from collections import deque from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass, field from typing import Any from uuid import UUID import streamlit as st from py_dvt_ate.data.repository import SQLiteRepository from py_dvt_ate.framework.runner import TestRunner from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory, InstrumentSet from py_dvt_ate.simulation.physics.engine import PhysicsEngine from py_dvt_ate.simulation.server import ServerConfig, SimulationServer from py_dvt_ate.tests.thermal.tempco import TempCoTest # Thread pool for background test execution _test_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="test_runner") # History buffer size for charts HISTORY_SIZE = 500 @dataclass class SimulationHistory: """Stores time series data for visualisation.""" time: deque[float] = field(default_factory=lambda: deque(maxlen=HISTORY_SIZE)) chamber_temp: deque[float] = field( default_factory=lambda: deque(maxlen=HISTORY_SIZE) ) case_temp: deque[float] = field(default_factory=lambda: deque(maxlen=HISTORY_SIZE)) junction_temp: deque[float] = field( default_factory=lambda: deque(maxlen=HISTORY_SIZE) ) output_voltage: deque[float] = field( default_factory=lambda: deque(maxlen=HISTORY_SIZE) ) power_dissipation: deque[float] = field( default_factory=lambda: deque(maxlen=HISTORY_SIZE) ) @dataclass class TestProgress: """Tracks live progress of a running test.""" phase: str = "Initialising" current_temp: float | None = None target_temp: float | None = None temp_index: int = 0 total_temps: int = 0 message: str = "" started_at: float = field(default_factory=time.time) @property def elapsed(self) -> float: """Return elapsed time in seconds.""" return time.time() - self.started_at def start_embedded_server() -> tuple[SimulationServer, threading.Thread]: """Start an embedded simulation server in a background thread. Returns: Tuple of (server instance, thread running the server). """ server = SimulationServer( ServerConfig( host="127.0.0.1", chamber_port=5001, psu_port=5002, dmm_port=5003, physics_rate_hz=100.0, ) ) server_ready = threading.Event() server_error: list[Exception] = [] # Use list to share error across threads def run_server() -> None: """Run the async server in a new event loop.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(server.start()) # Signal that server is ready server_ready.set() # Keep the event loop running loop.run_forever() except Exception as e: server_error.append(e) # Store error for main thread server_ready.set() # Signal even on error finally: try: loop.run_until_complete(server.stop()) except Exception: pass loop.close() thread = threading.Thread(target=run_server, daemon=True) thread.start() # Wait for server to be fully started (up to 5 seconds) if not server_ready.wait(timeout=5.0): st.error("Server failed to start within timeout") # Check if there was an error during startup if server_error: st.error(f"Server startup error: {server_error[0]}") return server, thread def init_session_state() -> None: """Initialise Streamlit session state.""" if "server" not in st.session_state: with st.spinner("Starting simulation server..."): st.session_state.server, st.session_state.server_thread = start_embedded_server() # Verify server started correctly if st.session_state.server.physics_engine is None: st.error("Failed to start simulation server. Please refresh the page.") st.stop() # Register cleanup def cleanup() -> None: if hasattr(st.session_state, "server") and st.session_state.server is not None: loop = asyncio.new_event_loop() try: loop.run_until_complete(st.session_state.server.stop()) except Exception: pass loop.close() atexit.register(cleanup) if "instruments" not in st.session_state: # Create instruments via HAL using factory config = InstrumentConfig( backend="simulator", simulator_host="127.0.0.1", chamber_port=5001, psu_port=5002, dmm_port=5003, ) st.session_state.instruments = InstrumentFactory.create(config) # Connect instruments for dashboard use try: st.session_state.instruments.chamber.transport.connect() st.session_state.instruments.psu.transport.connect() st.session_state.instruments.dmm.transport.connect() except Exception as e: st.error(f"Failed to connect to instruments: {e}") st.stop() if "repository" not in st.session_state: # Create test repository (temporary file for dashboard demo) import os import tempfile tmpdir = tempfile.gettempdir() db_path = os.path.join(tmpdir, "py_dvt_ate_dashboard.db") st.session_state.repository = SQLiteRepository(db_path) if "test_runner" not in st.session_state: st.session_state.test_runner = TestRunner(st.session_state.repository) if "history" not in st.session_state: st.session_state.history = SimulationHistory() if "last_update" not in st.session_state: st.session_state.last_update = time.time() if "test_running" not in st.session_state: st.session_state.test_running = False if "test_run_id" not in st.session_state: st.session_state.test_run_id = None if "test_future" not in st.session_state: st.session_state.test_future = None if "test_progress" not in st.session_state: st.session_state.test_progress = None def step_simulation() -> None: """Advance the simulation based on elapsed real time and multiplier.""" server: SimulationServer = st.session_state.server instruments: InstrumentSet = st.session_state.instruments history: SimulationHistory = st.session_state.history # Get physics engine for visualization (dashboard-specific access) engine: PhysicsEngine | None = server.physics_engine if engine is None: return # Update timestamp st.session_state.last_update = time.time() # Read measurements - use physics engine directly when test is running # to avoid race conditions with instrument TCP connections try: thermal = engine.get_thermal_state() electrical = engine.get_electrical_state() if st.session_state.test_running: # During test, use physics engine values directly (no HAL access) chamber_temp = thermal.chamber_temperature output_voltage = electrical.output_voltage else: # When idle, read via HAL to demonstrate instrument communication chamber_temp = instruments.chamber.get_temperature() output_voltage = instruments.dmm.measure_dc_voltage() history.time.append(thermal.timestamp) history.chamber_temp.append(chamber_temp) history.case_temp.append(thermal.case_temperature) history.junction_temp.append(thermal.junction_temperature) history.output_voltage.append(output_voltage) history.power_dissipation.append(electrical.power_dissipation) except (OSError, ValueError): # Ignore communication errors during updates pass def run_test_in_background( runner: TestRunner, test: Any, instruments: InstrumentSet, config: dict[str, Any], description: str, progress: TestProgress, ) -> UUID: """Run a test in a background thread, updating progress as it runs.""" temperatures = config.get("temperatures", []) progress.total_temps = len(temperatures) progress.phase = "Starting" progress.message = "Configuring instruments..." class ProgressTrackingChamber: """Wrapper that tracks chamber setpoint changes for progress.""" def __init__(self, chamber: Any) -> None: # Use object.__setattr__ to avoid triggering __getattr__ object.__setattr__(self, "_chamber", chamber) object.__setattr__(self, "_progress", progress) object.__setattr__(self, "_temps", temperatures) def set_temperature(self, temp: float) -> None: self._progress.target_temp = temp self._progress.phase = "Ramping" if temp in self._temps: self._progress.temp_index = self._temps.index(temp) + 1 self._progress.message = f"Ramping to {temp:.1f}C" self._chamber.set_temperature(temp) def get_temperature(self) -> float: temp: float = self._chamber.get_temperature() self._progress.current_temp = temp return temp def is_stable(self) -> bool: stable: bool = self._chamber.is_stable() if stable: self._progress.phase = "Measuring" if self._progress.current_temp is not None: self._progress.message = f"Stable at {self._progress.current_temp:.1f}C, measuring..." else: self._progress.phase = "Stabilising" if self._progress.target_temp is not None: self._progress.message = f"Waiting for stability at {self._progress.target_temp:.1f}C" return stable def __getattr__(self, name: str) -> Any: # Delegate all other attributes to the wrapped chamber return getattr(self._chamber, name) class WrappedInstruments: def __init__(self, orig: InstrumentSet) -> None: self.chamber = ProgressTrackingChamber(orig.chamber) self.psu = orig.psu self.dmm = orig.dmm wrapped = WrappedInstruments(instruments) run_id = runner.run_test( test=test, instruments=wrapped, # type: ignore config=config, operator="dashboard_user", description=description, ) progress.phase = "Complete" progress.message = "Test finished" return run_id def display_controls() -> None: """Display simulation control panel in sidebar.""" st.sidebar.header("Simulation Controls") test_running = st.session_state.get("test_running", False) # Show test status warning if test_running: st.sidebar.warning("Test in progress - instrument controls disabled") else: st.sidebar.info("Physics engine runs automatically. Use controls to adjust.") # Time multiplier (always available - affects simulation speed) st.sidebar.subheader("Simulation Speed") time_multiplier = st.sidebar.select_slider( "Time Multiplier", options=[1, 2, 5, 10, 20, 50, 100], value=st.session_state.get("time_multiplier", 10), format_func=lambda x: f"{x}x", key="time_multiplier_slider", ) st.sidebar.caption(f"1 real second = {time_multiplier} simulation seconds") # Apply time multiplier to server immediately if "server" in st.session_state: st.session_state.server.time_scale = time_multiplier st.session_state.time_multiplier = time_multiplier st.sidebar.divider() # Instrument controls (disabled during test) st.sidebar.subheader("Instrument Controls") if test_running: st.sidebar.caption("Controls disabled while test is running") with st.sidebar.form("parameter_controls"): st.markdown("**Thermal Chamber**") temp_setpoint = st.slider( "Temperature Setpoint (C)", min_value=-40.0, max_value=125.0, value=st.session_state.get("temp_setpoint", 25.0), step=5.0, disabled=test_running, ) st.divider() st.markdown("**Power Supply**") input_voltage = st.slider( "Input Voltage (V)", min_value=0.0, max_value=12.0, value=st.session_state.get("input_voltage", 5.0), step=0.1, disabled=test_running, ) output_enabled = st.toggle( "Output Enabled", value=st.session_state.get("output_enabled", False), disabled=test_running, ) st.divider() st.markdown("**Electronic Load**") load_current = st.slider( "Load Current (mA)", min_value=0.0, max_value=500.0, value=st.session_state.get("load_current", 100.0), step=10.0, disabled=test_running, ) submitted = st.form_submit_button( "Apply Changes", type="primary", disabled=test_running, ) if submitted and not test_running: st.session_state.temp_setpoint = temp_setpoint st.session_state.input_voltage = input_voltage st.session_state.output_enabled = output_enabled st.session_state.load_current = load_current st.rerun() @st.fragment(run_every=0.1) def simulation_display() -> None: """Fragment that displays and updates simulation state.""" if "server" not in st.session_state: st.warning("Initializing simulation server...") return server: SimulationServer = st.session_state.server engine: PhysicsEngine | None = server.physics_engine if engine is None: st.error("Physics engine not available. Try refreshing the page.") return if not server.is_running: st.warning("Server is not running. Try refreshing the page.") return instruments: InstrumentSet = st.session_state.instruments history: SimulationHistory = st.session_state.history # Apply control settings to instruments (only when not running a test) if not st.session_state.test_running: try: temp_setpoint = st.session_state.get("temp_setpoint", 25.0) input_voltage = st.session_state.get("input_voltage", 5.0) output_enabled = st.session_state.get("output_enabled", False) load_current_ma = st.session_state.get("load_current", 100.0) instruments.chamber.set_temperature(temp_setpoint) instruments.psu.set_voltage(1, input_voltage) instruments.psu.enable_output(1, output_enabled) engine.set_load_current(load_current_ma / 1000.0) except Exception: pass # Always update simulation data step_simulation() thermal = engine.get_thermal_state() electrical = engine.get_electrical_state() # Show test progress banner if test is running if st.session_state.test_running and st.session_state.test_progress: progress: TestProgress = st.session_state.test_progress if progress.target_temp is not None and progress.current_temp is not None: st.info( f"**Test Running:** {progress.phase} | " f"Step {progress.temp_index}/{progress.total_temps} | " f"Target: {progress.target_temp:.1f}C | " f"Current: {progress.current_temp:.1f}C | " f"Elapsed: {progress.elapsed:.0f}s" ) else: st.info(f"**Test Running:** {progress.phase} | {progress.message}") st.subheader("Current State") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Chamber Temp", f"{thermal.chamber_temperature:.2f} C") with col2: st.metric("Case Temp", f"{thermal.case_temperature:.2f} C") with col3: st.metric("Junction Temp", f"{thermal.junction_temperature:.2f} C") with col4: st.metric("Output Voltage", f"{electrical.output_voltage:.4f} V") col5, col6, col7, col8 = st.columns(4) with col5: st.metric("Input Voltage", f"{electrical.input_voltage:.2f} V") with col6: st.metric("Load Current", f"{electrical.load_current * 1000:.1f} mA") with col7: st.metric("Power Diss.", f"{electrical.power_dissipation * 1000:.2f} mW") with col8: status = "Test Running" if st.session_state.test_running else "Running" st.metric("Sim Time", f"{engine.simulation_time:.1f} s", delta=status) st.subheader("Instrument Status") if st.session_state.test_running: st.caption("Reading from physics engine (test in progress)") else: st.caption("All readings below use the Hardware Abstraction Layer interfaces") col1, col2, col3 = st.columns(3) with col1: st.markdown("#### Thermal Chamber") if st.session_state.test_running: # Use physics engine during test to avoid race conditions st.markdown(f"**Temperature:** {thermal.chamber_temperature:.2f} C") st.markdown(f"**Setpoint:** {thermal.chamber_temperature:.2f} C") st.markdown("**Stable:** (test running)") st.markdown("**Status:** Test in progress") else: try: chamber_temp = instruments.chamber.get_temperature() chamber_setpoint = instruments.chamber.get_setpoint() chamber_stable = instruments.chamber.is_stable() st.markdown(f"**Temperature:** {chamber_temp:.2f} C") st.markdown(f"**Setpoint:** {chamber_setpoint:.2f} C") st.markdown(f"**Stable:** {'Yes' if chamber_stable else 'No'}") st.markdown("**Status:** Connected") except (OSError, ValueError) as e: st.markdown("**Status:** Disconnected") st.caption(f"Error: {e}") with col2: st.markdown("#### Power Supply") if st.session_state.test_running: st.markdown(f"**Voltage:** {electrical.input_voltage:.2f} V") st.markdown(f"**Current:** {electrical.load_current * 1000:.1f} mA") st.markdown("**Output:** (test running)") st.markdown("**Status:** Test in progress") else: try: psu_voltage_setpoint = instruments.psu.get_voltage(1) psu_voltage_measured = instruments.psu.measure_voltage(1) psu_current = instruments.psu.measure_current(1) psu_enabled = instruments.psu.is_output_enabled(1) st.markdown(f"**Voltage Setpoint:** {psu_voltage_setpoint:.2f} V") st.markdown(f"**Voltage Measured:** {psu_voltage_measured:.3f} V") st.markdown(f"**Current:** {psu_current * 1000:.1f} mA") st.markdown(f"**Output:** {'Enabled' if psu_enabled else 'Disabled'}") except (OSError, ValueError) as e: st.markdown("**Status:** Disconnected") st.caption(f"Error: {e}") with col3: st.markdown("#### Multimeter") if st.session_state.test_running: st.markdown(f"**DC Voltage:** {electrical.output_voltage:.4f} V") st.markdown("**Mode:** DC Voltage") st.markdown("**Status:** Test in progress") else: try: dmm_voltage = instruments.dmm.measure_dc_voltage() st.markdown(f"**DC Voltage:** {dmm_voltage:.4f} V") st.markdown("**Mode:** DC Voltage") st.markdown("**Range:** Auto") st.markdown("**Status:** Connected") except (OSError, ValueError) as e: st.markdown("**Status:** Disconnected") st.caption(f"Error: {e}") st.divider() st.subheader("Temperature History") if len(history.time) < 2: st.info("Start the simulation to see temperature data") else: chart_data = { "Time (s)": list(history.time), "Chamber": list(history.chamber_temp), "Case": list(history.case_temp), "Junction": list(history.junction_temp), } st.line_chart(chart_data, x="Time (s)", y=["Chamber", "Case", "Junction"], color=["#1f77b4", "#ff7f0e", "#d62728"]) st.subheader("Self-Heating Demonstration") delta_t_jc = thermal.junction_temperature - thermal.case_temperature delta_t_ca = thermal.case_temperature - thermal.chamber_temperature col1, col2 = st.columns(2) with col1: st.markdown("#### Self-Heating Analysis") st.markdown(f""" | Parameter | Value | |-----------|-------| | Junction-Case Rise (dT_jc) | **{delta_t_jc:.2f} C** | | Case-Ambient Rise (dT_ca) | **{delta_t_ca:.2f} C** | | Power Dissipation | {electrical.power_dissipation * 1000:.1f} mW | | theta_jc (junction-case) | 15 C/W | | theta_ca (case-ambient) | 5 C/W | """) st.markdown(""" **Thermal Coupling:** The junction temperature rises above the case temperature due to power dissipation. This is governed by: `T_junction = T_case + P_diss x theta_jc` Try increasing the load current or input voltage to see self-heating effects! """) with col2: st.markdown("#### Power Dissipation") if len(history.time) < 2: st.info("Start the simulation to see power data") else: power_data = { "Time (s)": list(history.time), "Power (mW)": [p * 1000 for p in history.power_dissipation], } st.line_chart(power_data, x="Time (s)", y="Power (mW)", color="#2ca02c") @st.fragment(run_every=0.5) def test_progress_display() -> None: """Fragment that displays live test progress.""" if not st.session_state.test_running: return future: Future[UUID] | None = st.session_state.test_future if future is not None and future.done(): try: run_id = future.result() st.session_state.test_run_id = run_id except Exception as e: st.error(f"Test failed: {e}") finally: st.session_state.test_running = False st.session_state.test_future = None st.session_state.test_progress = None st.rerun() progress: TestProgress | None = st.session_state.test_progress if progress is None: st.info("Starting test...") return if progress.total_temps > 0: pct = progress.temp_index / progress.total_temps st.progress(pct, text=f"Step {progress.temp_index} of {progress.total_temps}") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Phase", progress.phase) with col2: st.metric("Target Temp", f"{progress.target_temp:.1f} C" if progress.target_temp else "N/A") with col3: st.metric("Current Temp", f"{progress.current_temp:.1f} C" if progress.current_temp else "N/A") with col4: st.metric("Elapsed", f"{progress.elapsed:.0f} s") st.caption(progress.message) def test_execution_page() -> None: """Test execution page for running DVT tests.""" st.header("Test Execution") st.markdown("Run DVT characterisation tests using the virtual lab bench.") st.subheader("Select Test") test_options = {"TempCo (Temperature Coefficient)": TempCoTest()} selected_test_name = st.selectbox( "Available Tests", options=list(test_options.keys()), disabled=st.session_state.test_running, ) selected_test = test_options[selected_test_name] st.info(f"**{selected_test.name}**: {selected_test.description}") st.subheader("Test Configuration") col1, col2 = st.columns(2) with col1: if selected_test.name == "tempco": input_voltage = st.number_input( "Input Voltage (V)", min_value=0.0, max_value=12.0, value=5.0, step=0.1, disabled=st.session_state.test_running, ) load_current = st.number_input( "Load Current (mA)", min_value=0.0, max_value=500.0, value=100.0, step=10.0, disabled=st.session_state.test_running, ) settle_time = st.number_input( "Settle Time (s)", min_value=0.0, max_value=60.0, value=5.0, step=1.0, disabled=st.session_state.test_running, ) with col2: if selected_test.name == "tempco": temp_min = st.number_input( "Min Temperature (C)", min_value=-40.0, max_value=125.0, value=-40.0, step=5.0, disabled=st.session_state.test_running, ) temp_max = st.number_input( "Max Temperature (C)", min_value=-40.0, max_value=125.0, value=85.0, step=5.0, disabled=st.session_state.test_running, ) temp_step = st.number_input( "Temperature Step (C)", min_value=1.0, max_value=50.0, value=25.0, step=5.0, disabled=st.session_state.test_running, ) if selected_test.name == "tempco": import numpy as np temperatures = list(np.arange(temp_min, temp_max + temp_step / 2, temp_step)) st.caption(f"Temperature points: {temperatures}") test_config = { "temperatures": temperatures, "input_voltage": input_voltage, "load_current": load_current / 1000.0, "settle_time": settle_time, } else: test_config = {} st.subheader("Execution") col1, col2 = st.columns([1, 3]) with col1: if st.button("Run Test", type="primary", disabled=st.session_state.test_running): runner: TestRunner = st.session_state.test_runner instruments: InstrumentSet = st.session_state.instruments try: _ = instruments.chamber.get_temperature() except Exception as e: st.error(f"Instruments not available: {e}") st.stop() progress = TestProgress() st.session_state.test_progress = progress st.session_state.test_running = True st.session_state.test_run_id = None future = _test_executor.submit( run_test_in_background, runner=runner, test=selected_test, instruments=instruments, config=test_config, description=f"Dashboard execution: {selected_test_name}", progress=progress, ) st.session_state.test_future = future st.rerun() with col2: if st.session_state.test_running: st.warning("Test is running in background - check progress below and Lab Bench tab for live updates") elif st.session_state.test_run_id: st.success("Test completed!") else: st.caption("Click 'Run Test' to start") if st.session_state.test_running: st.divider() st.subheader("Live Progress") test_progress_display() if st.session_state.test_run_id and not st.session_state.test_running: st.subheader("Test Results") repository: SQLiteRepository = st.session_state.repository run = repository.get_run(st.session_state.test_run_id) col1, col2, col3 = st.columns(3) with col1: st.metric("Status", f"{run.status.value.upper()}") with col2: if run.completed_at and run.started_at: duration = (run.completed_at - run.started_at).total_seconds() st.metric("Duration", f"{duration:.1f} s") else: st.metric("Duration", "N/A") with col3: results = repository.get_results(st.session_state.test_run_id) st.metric("Results", len(results)) if run.status.value.upper() == "ERROR": st.error("Test encountered an error during execution.") elif run.status.value.upper() == "FAILED": st.warning("Test completed but one or more results failed to meet specifications.") if results: st.markdown("#### Detailed Results") results_data = [] for result in results: results_data.append({ "Parameter": result.parameter, "Value": f"{result.value:.6f}", "Unit": result.unit or "", "Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A", "Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A", "Status": "PASS" if result.passed else "FAIL", }) st.table(results_data) def results_viewer_page() -> None: """Results viewer page for browsing historical test results.""" st.header("Results Viewer") st.markdown("Browse and analyze historical test results.") repository: SQLiteRepository = st.session_state.repository all_runs = repository.get_all_runs() if not all_runs: st.info("No test results available. Run a test in the Test Execution tab to generate results.") return st.subheader("Summary") col1, col2, col3, col4 = st.columns(4) status_counts: dict[str, int] = {} for run in all_runs: status_counts[run.status.value] = status_counts.get(run.status.value, 0) + 1 with col1: st.metric("Total Runs", len(all_runs)) with col2: st.metric("Passed", status_counts.get("passed", 0)) with col3: st.metric("Failed", status_counts.get("failed", 0)) with col4: st.metric("Errors", status_counts.get("error", 0)) st.divider() st.subheader("Filter Results") col1, col2 = st.columns(2) with col1: test_names = list({run.test_name for run in all_runs}) selected_tests = st.multiselect("Test Name", options=["All"] + test_names, default=["All"]) with col2: statuses = list({run.status.value for run in all_runs}) selected_statuses = st.multiselect("Status", options=["All"] + statuses, default=["All"]) filtered_runs = all_runs if "All" not in selected_tests: filtered_runs = [r for r in filtered_runs if r.test_name in selected_tests] if "All" not in selected_statuses: filtered_runs = [r for r in filtered_runs if r.status.value in selected_statuses] st.markdown(f"**Showing {len(filtered_runs)} of {len(all_runs)} runs**") st.subheader("Test Runs") if not filtered_runs: st.info("No test runs match the selected filters.") return table_data = [] for run in filtered_runs: status_icon = {"passed": "PASS", "failed": "FAIL", "error": "ERR"}.get(run.status.value, "?") duration = "N/A" if run.completed_at and run.started_at: duration = f"{(run.completed_at - run.started_at).total_seconds():.1f}s" table_data.append({ "Select": False, "ID": run.id[:8], "Test": run.test_name, "Status": f"{status_icon}", "Started": run.started_at.strftime("%Y-%m-%d %H:%M:%S"), "Duration": duration, "Operator": run.operator or "N/A", }) import pandas as pd df = pd.DataFrame(table_data) edited_df = st.data_editor( df, width="stretch", hide_index=True, column_config={ "Select": st.column_config.CheckboxColumn("Select", help="Select a test run to view details", default=False) }, disabled=["ID", "Test", "Status", "Started", "Duration", "Operator"], ) selected_rows = edited_df[edited_df["Select"]] if not selected_rows.empty: selected_id_short = selected_rows.iloc[0]["ID"] selected_run = next((r for r in filtered_runs if r.id.startswith(selected_id_short)), None) if selected_run: st.divider() st.subheader(f"Test Run Details: {selected_run.test_name}") col1, col2, col3 = st.columns(3) with col1: st.markdown(f"**Run ID:** `{selected_run.id}`") st.markdown(f"**Test:** {selected_run.test_name}") st.markdown(f"**Status:** {selected_run.status.value.upper()}") with col2: st.markdown(f"**Started:** {selected_run.started_at.strftime('%Y-%m-%d %H:%M:%S')}") if selected_run.completed_at: st.markdown(f"**Completed:** {selected_run.completed_at.strftime('%Y-%m-%d %H:%M:%S')}") if selected_run.completed_at and selected_run.started_at: duration_sec = (selected_run.completed_at - selected_run.started_at).total_seconds() st.markdown(f"**Duration:** {duration_sec:.1f}s") with col3: st.markdown(f"**Operator:** {selected_run.operator or 'N/A'}") if selected_run.description: st.markdown(f"**Description:** {selected_run.description}") if selected_run.config_json: import json with st.expander("Test Configuration"): config = json.loads(selected_run.config_json) st.json(config) run_uuid = UUID(selected_run.id) results = repository.get_results(run_uuid) if results: st.markdown("### Test Results") results_table = [] for result in results: pass_status = "PASS" if result.passed else "FAIL" if result.passed is False else "N/A" results_table.append({ "Parameter": result.parameter, "Value": f"{result.value:.6f}", "Unit": result.unit, "Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A", "Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A", "Status": pass_status, }) st.table(results_table) try: measurements = repository.get_measurements_dataframe(run_uuid) if measurements is not None and not measurements.empty: st.markdown("### Measurements") st.caption(f"Total measurements: {len(measurements)}") parameters = measurements["parameter"].unique() for param in parameters: param_data = measurements[measurements["parameter"] == param] if not param_data.empty: st.markdown(f"#### {param}") import altair as alt chart = alt.Chart(param_data).mark_line().encode( x=alt.X("timestamp:Q", title="Time (s)"), y=alt.Y("value:Q", title=f"Value ({param_data.iloc[0]['unit']})"), tooltip=["timestamp", "value", "unit"] ).properties(height=300) st.altair_chart(chart, width="stretch") except Exception as e: st.caption(f"No time-series measurement data available ({e})") def main() -> None: """Main entry point for the Streamlit dashboard.""" st.set_page_config( page_title="py-dvt-ate Virtual Lab Bench", page_icon="🔬", layout="wide", ) st.title("py-dvt-ate Virtual Lab Bench") st.markdown(""" Interactive demonstration of the Hardware Abstraction Layer (HAL) controlling a simulated lab bench with thermal chamber, power supply, and multimeter, showing coupled thermal-electrical behaviour of an LDO voltage regulator. """) init_session_state() display_controls() tab1, tab2, tab3 = st.tabs(["Lab Bench", "Test Execution", "Results Viewer"]) with tab1: simulation_display() with tab2: test_execution_page() with tab3: results_viewer_page() if __name__ == "__main__": main()