Files
py-dvt-ate/src/py_dvt_ate/app/dashboard/app.py

949 lines
33 KiB
Python

"""Streamlit dashboard application for physics simulation visualisation.
This module provides an interactive dashboard for visualising the physics
simulation through the Hardware Abstraction Layer (HAL), demonstrating
thermal-electrical coupling in real-time using instrument interfaces.
"""
import asyncio
import atexit
import threading
import time
from collections import deque
from dataclasses import dataclass, field
import streamlit as st
from py_dvt_ate.data.repository import SQLiteRepository
from py_dvt_ate.framework.runner import TestRunner
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory, InstrumentSet
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.tests.thermal.tempco import TempCoTest
# History buffer size for charts
HISTORY_SIZE = 500
@dataclass
class SimulationHistory:
"""Stores time series data for visualisation."""
time: deque[float] = field(default_factory=lambda: deque(maxlen=HISTORY_SIZE))
chamber_temp: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
case_temp: deque[float] = field(default_factory=lambda: deque(maxlen=HISTORY_SIZE))
junction_temp: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
output_voltage: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
power_dissipation: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
def start_embedded_server() -> tuple[SimulationServer, threading.Thread]:
"""Start an embedded simulation server in a background thread.
Returns:
Tuple of (server instance, thread running the server).
"""
server = SimulationServer(
ServerConfig(
host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
physics_rate_hz=100.0,
)
)
server_ready = threading.Event()
server_error: list[Exception] = [] # Use list to share error across threads
def run_server() -> None:
"""Run the async server in a new event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(server.start())
# Signal that server is ready
server_ready.set()
# Keep the event loop running
loop.run_forever()
except Exception as e:
server_error.append(e) # Store error for main thread
server_ready.set() # Signal even on error
finally:
try:
loop.run_until_complete(server.stop())
except Exception:
pass
loop.close()
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
# Wait for server to be fully started (up to 5 seconds)
if not server_ready.wait(timeout=5.0):
st.error("Server failed to start within timeout")
# Check if there was an error during startup
if server_error:
st.error(f"Server startup error: {server_error[0]}")
return server, thread
def init_session_state() -> None:
"""Initialise Streamlit session state."""
if "server" not in st.session_state:
with st.spinner("Starting simulation server..."):
st.session_state.server, st.session_state.server_thread = start_embedded_server()
# Verify server started correctly
if st.session_state.server.physics_engine is None:
st.error("Failed to start simulation server. Please refresh the page.")
st.stop()
# Register cleanup
def cleanup() -> None:
if hasattr(st.session_state, "server") and st.session_state.server is not None:
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(st.session_state.server.stop())
except Exception:
pass
loop.close()
atexit.register(cleanup)
if "instruments" not in st.session_state:
# Create instruments via HAL using factory
config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
)
st.session_state.instruments = InstrumentFactory.create(config)
if "repository" not in st.session_state:
# Create test repository (temporary file for dashboard demo)
import os
import tempfile
tmpdir = tempfile.gettempdir()
db_path = os.path.join(tmpdir, "py_dvt_ate_dashboard.db")
st.session_state.repository = SQLiteRepository(db_path)
if "test_runner" not in st.session_state:
st.session_state.test_runner = TestRunner(st.session_state.repository)
if "history" not in st.session_state:
st.session_state.history = SimulationHistory()
if "running" not in st.session_state:
st.session_state.running = True # Start charts by default
if "last_update" not in st.session_state:
st.session_state.last_update = time.time()
if "test_running" not in st.session_state:
st.session_state.test_running = False
if "test_run_id" not in st.session_state:
st.session_state.test_run_id = None
# Note: time_multiplier, temp_setpoint, input_voltage, output_enabled,
# load_current are managed by their respective widgets via keys
def step_simulation() -> None:
"""Advance the simulation based on elapsed real time and multiplier."""
server: SimulationServer = st.session_state.server
instruments: InstrumentSet = st.session_state.instruments
history: SimulationHistory = st.session_state.history
# The physics engine runs in the background server thread automatically.
# We just need to read current measurements via HAL and update history.
# Get physics engine for visualization (dashboard-specific access)
engine: PhysicsEngine | None = server.physics_engine
if engine is None:
return
# Update timestamp
st.session_state.last_update = time.time()
# Read measurements via HAL
try:
chamber_temp = instruments.chamber.get_temperature()
# For DMM, we need to measure the DUT output voltage
output_voltage = instruments.dmm.measure_dc_voltage()
# Get physics state for detailed visualization
# (In production, you'd only have what instruments can measure)
thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
history.time.append(thermal.timestamp)
history.chamber_temp.append(chamber_temp)
history.case_temp.append(thermal.case_temperature)
history.junction_temp.append(thermal.junction_temperature)
history.output_voltage.append(output_voltage)
history.power_dissipation.append(electrical.power_dissipation)
except OSError:
# Ignore communication errors during updates
pass
def sync_instruments_from_session_state() -> None:
"""Sync instrument settings from session state via HAL (called by fragment)."""
instruments: InstrumentSet = st.session_state.instruments
try:
# Control thermal chamber via HAL
instruments.chamber.set_temperature(st.session_state.get("temp_setpoint", 25.0))
# Control power supply via HAL
input_voltage = st.session_state.get("input_voltage", 5.0)
output_enabled = st.session_state.get("output_enabled", False)
instruments.psu.set_voltage(1, input_voltage)
instruments.psu.enable_output(1, output_enabled)
# Note: Load current would typically be controlled by an electronic load
# instrument. For simulation, we set it directly on the physics engine.
# In production, you'd have a separate IElectronicLoad interface.
server: SimulationServer = st.session_state.server
engine: PhysicsEngine | None = server.physics_engine
if engine is not None:
load_current_a = st.session_state.get("load_current", 100.0) / 1000.0
engine.set_load_current(load_current_a)
except OSError:
# Ignore communication errors
pass
def display_controls() -> None:
"""Display simulation control panel in sidebar."""
st.sidebar.header("Simulation Controls")
st.sidebar.info("🔧 Physics engine and charts are running automatically. Use the button below to pause/resume chart updates if needed.")
# Pause/Resume button
if st.session_state.running:
if st.sidebar.button(
"⏸️ Pause Charts", type="primary", width="stretch"
):
st.session_state.running = False
else:
if st.sidebar.button(
"▶️ Start Charts", type="primary", width="stretch"
):
st.session_state.running = True
st.session_state.last_update = time.time()
# Reset button
if st.sidebar.button("Reset", width="stretch"):
# Stop server and restart
server: SimulationServer = st.session_state.server
loop = asyncio.new_event_loop()
loop.run_until_complete(server.stop())
loop.close()
# Restart server
st.session_state.server, st.session_state.server_thread = start_embedded_server()
# Reconnect instruments
config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
)
st.session_state.instruments = InstrumentFactory.create(config)
# Reset history
st.session_state.history = SimulationHistory()
st.session_state.running = False
st.session_state.last_update = time.time()
st.sidebar.divider()
# Time multiplier
st.sidebar.subheader("Simulation Speed")
st.sidebar.select_slider(
"Time Multiplier",
options=[1, 2, 5, 10, 20, 50, 100],
value=10,
format_func=lambda x: f"{x}x",
key="time_multiplier",
)
st.sidebar.caption(
f"1 real second = {st.session_state.get('time_multiplier', 10)} simulation seconds"
)
st.sidebar.divider()
# Temperature setpoint
st.sidebar.subheader("Thermal Chamber")
st.sidebar.slider(
"Temperature Setpoint (C)",
min_value=-40.0,
max_value=125.0,
value=25.0,
step=5.0,
key="temp_setpoint",
)
st.sidebar.divider()
# Power supply controls
st.sidebar.subheader("Power Supply")
st.sidebar.slider(
"Input Voltage (V)",
min_value=0.0,
max_value=12.0,
value=5.0,
step=0.1,
key="input_voltage",
)
st.sidebar.toggle(
"Output Enabled",
value=False,
key="output_enabled",
)
st.sidebar.divider()
# Load controls
st.sidebar.subheader("Electronic Load")
st.sidebar.slider(
"Load Current (mA)",
min_value=0.0,
max_value=500.0,
value=100.0,
step=10.0,
key="load_current",
)
@st.fragment(run_every=0.1)
def simulation_display() -> None:
"""Fragment that displays and updates simulation state."""
# Check if server is initialized
if "server" not in st.session_state:
st.warning("⏳ Initializing simulation server...")
return
server: SimulationServer = st.session_state.server
# Get current state from physics engine (for visualization)
engine: PhysicsEngine | None = server.physics_engine
if engine is None:
st.error("❌ Physics engine not available. The server may not have started correctly. Try refreshing the page.")
return
# Check if server is running
if not server.is_running:
st.warning("⚠️ Server is not running. Try refreshing the page.")
return
instruments: InstrumentSet = st.session_state.instruments
history: SimulationHistory = st.session_state.history
# Sync instrument settings from UI controls via HAL
sync_instruments_from_session_state()
# Step simulation if running
if st.session_state.running:
step_simulation()
thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
# Current state metrics
st.subheader("Current State")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Chamber Temp", f"{thermal.chamber_temperature:.2f} C")
with col2:
st.metric("Case Temp", f"{thermal.case_temperature:.2f} C")
with col3:
st.metric("Junction Temp", f"{thermal.junction_temperature:.2f} C")
with col4:
st.metric("Output Voltage", f"{electrical.output_voltage:.4f} V")
col5, col6, col7, col8 = st.columns(4)
with col5:
st.metric("Input Voltage", f"{electrical.input_voltage:.2f} V")
with col6:
st.metric("Load Current", f"{electrical.load_current * 1000:.1f} mA")
with col7:
st.metric("Power Diss.", f"{electrical.power_dissipation * 1000:.2f} mW")
with col8:
status = "Running" if st.session_state.running else "Stopped"
st.metric(
"Sim Time",
f"{engine.simulation_time:.1f} s",
delta=f"{status} @ {st.session_state.get('time_multiplier', 10):.0f}x",
)
# Instrument Status Panels (HAL View)
st.subheader("Instrument Status (via HAL)")
st.caption("All readings below use the Hardware Abstraction Layer interfaces")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("#### Thermal Chamber")
try:
chamber_temp = instruments.chamber.get_temperature()
chamber_setpoint = instruments.chamber.get_setpoint()
chamber_stable = instruments.chamber.is_stable()
st.markdown(f"**Temperature:** {chamber_temp:.2f} °C")
st.markdown(f"**Setpoint:** {chamber_setpoint:.2f} °C")
st.markdown(f"**Stable:** {'Yes' if chamber_stable else 'No'}")
st.markdown("**Status:** 🟢 Connected")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
with col2:
st.markdown("#### Power Supply")
try:
psu_voltage_setpoint = instruments.psu.get_voltage(1)
psu_voltage_measured = instruments.psu.measure_voltage(1)
psu_current = instruments.psu.measure_current(1)
psu_enabled = instruments.psu.is_output_enabled(1)
st.markdown(f"**Voltage Setpoint:** {psu_voltage_setpoint:.2f} V")
st.markdown(f"**Voltage Measured:** {psu_voltage_measured:.3f} V")
st.markdown(f"**Current:** {psu_current * 1000:.1f} mA")
st.markdown(f"**Output:** {'🟢 Enabled' if psu_enabled else '🔴 Disabled'}")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
with col3:
st.markdown("#### Multimeter")
try:
dmm_voltage = instruments.dmm.measure_dc_voltage()
st.markdown(f"**DC Voltage:** {dmm_voltage:.4f} V")
st.markdown("**Mode:** DC Voltage")
st.markdown("**Range:** Auto")
st.markdown("**Status:** 🟢 Connected")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
st.divider()
# Temperature chart
st.subheader("Temperature History")
if len(history.time) < 2:
st.info("Start the simulation to see temperature data")
else:
chart_data = {
"Time (s)": list(history.time),
"Chamber": list(history.chamber_temp),
"Case": list(history.case_temp),
"Junction": list(history.junction_temp),
}
st.line_chart(
chart_data,
x="Time (s)",
y=["Chamber", "Case", "Junction"],
color=["#1f77b4", "#ff7f0e", "#d62728"],
)
# Self-heating demonstration
st.subheader("Self-Heating Demonstration")
delta_t_jc = thermal.junction_temperature - thermal.case_temperature
delta_t_ca = thermal.case_temperature - thermal.chamber_temperature
col1, col2 = st.columns(2)
with col1:
st.markdown("#### Self-Heating Analysis")
st.markdown(
f"""
| Parameter | Value |
|-----------|-------|
| Junction-Case Rise (dT_jc) | **{delta_t_jc:.2f} C** |
| Case-Ambient Rise (dT_ca) | **{delta_t_ca:.2f} C** |
| Power Dissipation | {electrical.power_dissipation * 1000:.1f} mW |
| theta_jc (junction-case) | 15 C/W |
| theta_ca (case-ambient) | 5 C/W |
"""
)
st.markdown(
"""
**Thermal Coupling:** The junction temperature rises above the case
temperature due to power dissipation. This is governed by:
`T_junction = T_case + P_diss x theta_jc`
Try increasing the load current or input voltage to see
self-heating effects!
"""
)
with col2:
st.markdown("#### Power Dissipation")
if len(history.time) < 2:
st.info("Start the simulation to see power data")
else:
power_data = {
"Time (s)": list(history.time),
"Power (mW)": [p * 1000 for p in history.power_dissipation],
}
st.line_chart(
power_data,
x="Time (s)",
y="Power (mW)",
color="#2ca02c",
)
def test_execution_page() -> None:
"""Test execution page for running DVT tests."""
st.header("Test Execution")
st.markdown("Run DVT characterisation tests using the virtual lab bench.")
# Test selection
st.subheader("Select Test")
test_options = {
"TempCo (Temperature Coefficient)": TempCoTest(),
}
selected_test_name = st.selectbox(
"Available Tests",
options=list(test_options.keys()),
disabled=st.session_state.test_running,
)
selected_test = test_options[selected_test_name]
# Display test description
st.info(f"**{selected_test.name}**: {selected_test.description}")
# Test configuration
st.subheader("Test Configuration")
col1, col2 = st.columns(2)
with col1:
if selected_test.name == "tempco":
input_voltage = st.number_input(
"Input Voltage (V)",
min_value=0.0,
max_value=12.0,
value=5.0,
step=0.1,
disabled=st.session_state.test_running,
)
load_current = st.number_input(
"Load Current (mA)",
min_value=0.0,
max_value=500.0,
value=100.0,
step=10.0,
disabled=st.session_state.test_running,
)
settle_time = st.number_input(
"Settle Time (s)",
min_value=0.0,
max_value=60.0,
value=5.0,
step=1.0,
disabled=st.session_state.test_running,
)
with col2:
if selected_test.name == "tempco":
temp_min = st.number_input(
"Min Temperature (C)",
min_value=-40.0,
max_value=125.0,
value=-40.0,
step=5.0,
disabled=st.session_state.test_running,
)
temp_max = st.number_input(
"Max Temperature (C)",
min_value=-40.0,
max_value=125.0,
value=85.0,
step=5.0,
disabled=st.session_state.test_running,
)
temp_step = st.number_input(
"Temperature Step (C)",
min_value=1.0,
max_value=50.0,
value=25.0,
step=5.0,
disabled=st.session_state.test_running,
)
# Generate temperature points
if selected_test.name == "tempco":
import numpy as np
temperatures = list(np.arange(temp_min, temp_max + temp_step / 2, temp_step))
st.caption(f"Temperature points: {temperatures}")
test_config = {
"temperatures": temperatures,
"input_voltage": input_voltage,
"load_current": load_current / 1000.0, # Convert mA to A
"settle_time": settle_time,
}
else:
test_config = {}
# Run test button
st.subheader("Execution")
col1, col2 = st.columns([1, 3])
with col1:
if st.button(
"Run Test",
type="primary",
width="stretch",
disabled=st.session_state.test_running,
):
st.session_state.test_running = True
st.session_state.test_run_id = None
st.rerun()
with col2:
if st.session_state.test_running:
st.info("🔄 Test is running...")
elif st.session_state.test_run_id:
st.success("✅ Test completed!")
else:
st.caption("Click 'Run Test' to start")
# Execute test if running
if st.session_state.test_running:
with st.spinner("Running test..."):
try:
runner: TestRunner = st.session_state.test_runner
instruments: InstrumentSet = st.session_state.instruments
# Connect instruments before running test
try:
instruments.chamber.transport.connect() # type: ignore[attr-defined]
instruments.psu.transport.connect() # type: ignore[attr-defined]
instruments.dmm.transport.connect() # type: ignore[attr-defined]
except Exception as e:
st.error(f"Failed to connect to instruments: {e}")
st.session_state.test_running = False
st.stop()
# Run the test
run_id = runner.run_test(
test=selected_test,
instruments=instruments,
config=test_config,
operator="dashboard_user",
description=f"Dashboard execution: {selected_test_name}",
)
st.session_state.test_run_id = run_id
st.session_state.test_running = False
st.rerun()
except Exception as e:
st.error(f"Test execution failed: {e}")
st.session_state.test_running = False
# Display results if available
if st.session_state.test_run_id:
st.subheader("Test Results")
repository: SQLiteRepository = st.session_state.repository
run = repository.get_run(st.session_state.test_run_id)
col1, col2, col3 = st.columns(3)
with col1:
status_color = {"PASSED": "🟢", "FAILED": "🔴", "ERROR": "🟡"}.get(
run.status.value.upper(), ""
)
st.metric("Status", f"{status_color} {run.status.value.upper()}")
with col2:
if run.completed_at and run.started_at:
duration = (run.completed_at - run.started_at).total_seconds()
st.metric("Duration", f"{duration:.1f} s")
else:
st.metric("Duration", "N/A")
with col3:
results = repository.get_results(st.session_state.test_run_id)
st.metric("Results", len(results))
# Show error/warning message for non-PASSED tests
if run.status.value.upper() == "ERROR":
st.error("⚠️ Test encountered an error during execution. Check the detailed results below or the terminal output for error messages.")
elif run.status.value.upper() == "FAILED":
st.warning("⚠️ Test completed but one or more results failed to meet specifications.")
# Show detailed results
if results:
st.markdown("#### Detailed Results")
results_data = []
for result in results:
results_data.append({
"Parameter": result.parameter,
"Value": f"{result.value:.6f}",
"Unit": result.unit or "",
"Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A",
"Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A",
"Status": "✅ PASS" if result.passed else "❌ FAIL",
})
st.table(results_data)
def results_viewer_page() -> None:
"""Results viewer page for browsing historical test results."""
st.header("Results Viewer")
st.markdown("Browse and analyze historical test results.")
repository: SQLiteRepository = st.session_state.repository
# Get all test runs
all_runs = repository.get_all_runs()
if not all_runs:
st.info("No test results available. Run a test in the Test Execution tab to generate results.")
return
# Summary statistics
st.subheader("Summary")
col1, col2, col3, col4 = st.columns(4)
status_counts: dict[str, int] = {}
for run in all_runs:
status_counts[run.status.value] = status_counts.get(run.status.value, 0) + 1
with col1:
st.metric("Total Runs", len(all_runs))
with col2:
st.metric("Passed", status_counts.get("passed", 0))
with col3:
st.metric("Failed", status_counts.get("failed", 0))
with col4:
st.metric("Errors", status_counts.get("error", 0))
st.divider()
# Filter controls
st.subheader("Filter Results")
col1, col2 = st.columns(2)
with col1:
# Filter by test name
test_names = list({run.test_name for run in all_runs})
selected_tests = st.multiselect(
"Test Name",
options=["All"] + test_names,
default=["All"],
)
with col2:
# Filter by status
statuses = list({run.status.value for run in all_runs})
selected_statuses = st.multiselect(
"Status",
options=["All"] + statuses,
default=["All"],
)
# Apply filters
filtered_runs = all_runs
if "All" not in selected_tests:
filtered_runs = [r for r in filtered_runs if r.test_name in selected_tests]
if "All" not in selected_statuses:
filtered_runs = [r for r in filtered_runs if r.status.value in selected_statuses]
st.markdown(f"**Showing {len(filtered_runs)} of {len(all_runs)} runs**")
# Test runs table
st.subheader("Test Runs")
if not filtered_runs:
st.info("No test runs match the selected filters.")
return
# Create table data
table_data = []
for run in filtered_runs:
status_icon = {"passed": "🟢", "failed": "🔴", "error": "🟡", "running": "🔵", "pending": "", "skipped": ""}.get(
run.status.value, ""
)
duration = "N/A"
if run.completed_at and run.started_at:
duration = f"{(run.completed_at - run.started_at).total_seconds():.1f}s"
table_data.append({
"Select": False,
"ID": run.id[:8],
"Test": run.test_name,
"Status": f"{status_icon} {run.status.value.upper()}",
"Started": run.started_at.strftime("%Y-%m-%d %H:%M:%S"),
"Duration": duration,
"Operator": run.operator or "N/A",
})
# Display table with selection
import pandas as pd
df = pd.DataFrame(table_data)
# Use data editor for row selection
edited_df = st.data_editor(
df,
width="stretch",
hide_index=True,
column_config={
"Select": st.column_config.CheckboxColumn(
"Select",
help="Select a test run to view details",
default=False,
)
},
disabled=["ID", "Test", "Status", "Started", "Duration", "Operator"],
)
# Get selected run
selected_rows = edited_df[edited_df["Select"]]
if not selected_rows.empty:
selected_id_short = selected_rows.iloc[0]["ID"]
# Find full UUID
selected_run = next((r for r in filtered_runs if r.id.startswith(selected_id_short)), None)
if selected_run:
st.divider()
st.subheader(f"Test Run Details: {selected_run.test_name}")
# Run metadata
col1, col2, col3 = st.columns(3)
with col1:
st.markdown(f"**Run ID:** `{selected_run.id}`")
st.markdown(f"**Test:** {selected_run.test_name}")
st.markdown(f"**Status:** {selected_run.status.value.upper()}")
with col2:
st.markdown(f"**Started:** {selected_run.started_at.strftime('%Y-%m-%d %H:%M:%S')}")
if selected_run.completed_at:
st.markdown(f"**Completed:** {selected_run.completed_at.strftime('%Y-%m-%d %H:%M:%S')}")
if selected_run.completed_at and selected_run.started_at:
duration_sec = (selected_run.completed_at - selected_run.started_at).total_seconds()
st.markdown(f"**Duration:** {duration_sec:.1f}s")
with col3:
st.markdown(f"**Operator:** {selected_run.operator or 'N/A'}")
if selected_run.description:
st.markdown(f"**Description:** {selected_run.description}")
# Configuration
if selected_run.config_json:
import json
with st.expander("Test Configuration"):
config = json.loads(selected_run.config_json)
st.json(config)
# Results
from uuid import UUID
run_uuid = UUID(selected_run.id)
results = repository.get_results(run_uuid)
if results:
st.markdown("### Test Results")
results_table = []
for result in results:
pass_status = "✅ PASS" if result.passed else "❌ FAIL" if result.passed is False else "N/A"
results_table.append({
"Parameter": result.parameter,
"Value": f"{result.value:.6f}",
"Unit": result.unit,
"Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A",
"Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A",
"Status": pass_status,
})
st.table(results_table)
# Measurements
try:
measurements = repository.get_measurements_dataframe(run_uuid)
if measurements is not None and not measurements.empty:
st.markdown("### Measurements")
# Show measurement count
st.caption(f"Total measurements: {len(measurements)}")
# Plot measurements by parameter
parameters = measurements["parameter"].unique()
for param in parameters:
param_data = measurements[measurements["parameter"] == param]
if not param_data.empty:
st.markdown(f"#### {param}")
# Create chart
import altair as alt
chart = alt.Chart(param_data).mark_line().encode(
x=alt.X("timestamp:Q", title="Time (s)"),
y=alt.Y("value:Q", title=f"Value ({param_data.iloc[0]['unit']})"),
tooltip=["timestamp", "value", "unit"]
).properties(
height=300
)
st.altair_chart(chart, width="stretch")
except Exception as e:
st.caption(f"No time-series measurement data available ({e})")
def main() -> None:
"""Main entry point for the Streamlit dashboard."""
st.set_page_config(
page_title="py-dvt-ate Virtual Lab Bench",
page_icon="🔬",
layout="wide",
)
st.title("py-dvt-ate Virtual Lab Bench")
st.markdown(
"""
Interactive demonstration of the Hardware Abstraction Layer (HAL)
controlling a simulated lab bench with thermal chamber, power supply,
and multimeter, showing coupled thermal-electrical behaviour of an
LDO voltage regulator.
"""
)
init_session_state()
# Sidebar controls (static - doesn't need fragment)
display_controls()
# Create tabs for different views
tab1, tab2, tab3 = st.tabs(["🔬 Lab Bench", "🧪 Test Execution", "📊 Results Viewer"])
with tab1:
# Dynamic simulation display (uses fragment for smooth updates)
simulation_display()
with tab2:
test_execution_page()
with tab3:
results_viewer_page()
if __name__ == "__main__":
main()