Files
py-dvt-ate/src/py_dvt_ate/app/dashboard/app.py
Kai Chappell b7663d5a31 Add idle auto-shutdown for self-hosted deployment
- IDLE_TIMEOUT_MINUTES env var to configure shutdown after inactivity
- Background thread monitors activity and exits when timeout reached
- Activity tracked via simulation_display fragment (runs while page open)
- Set to 0 (default) to disable auto-shutdown
2026-01-29 21:08:17 +00:00

1045 lines
39 KiB
Python

"""Streamlit dashboard application for physics simulation visualisation.
This module provides an interactive dashboard for visualising the physics
simulation through the Hardware Abstraction Layer (HAL), demonstrating
thermal-electrical coupling in real-time using instrument interfaces.
"""
import asyncio
import atexit
import os
import sys
import threading
import time
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import Any
from uuid import UUID
import streamlit as st
from py_dvt_ate.data.repository import SQLiteRepository
from py_dvt_ate.framework.runner import TestRunner
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory, InstrumentSet
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.tests.thermal.tempco import TempCoTest
# Thread pool for background test execution
_test_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="test_runner")
# Idle shutdown configuration
# Set IDLE_TIMEOUT_MINUTES=0 to disable auto-shutdown
IDLE_TIMEOUT_MINUTES = int(os.environ.get("IDLE_TIMEOUT_MINUTES", "0"))
_last_activity_time: float = time.time()
_idle_monitor_started = False
def _update_activity() -> None:
"""Update the last activity timestamp."""
global _last_activity_time
_last_activity_time = time.time()
def _idle_monitor() -> None:
"""Background thread that exits the app after idle timeout."""
global _last_activity_time
while True:
time.sleep(60) # Check every minute
if IDLE_TIMEOUT_MINUTES <= 0:
continue
idle_minutes = (time.time() - _last_activity_time) / 60
if idle_minutes >= IDLE_TIMEOUT_MINUTES:
print(f"Idle timeout reached ({IDLE_TIMEOUT_MINUTES} minutes). Shutting down.")
os._exit(0)
def _start_idle_monitor() -> None:
"""Start the idle monitor thread if timeout is configured."""
global _idle_monitor_started
if IDLE_TIMEOUT_MINUTES > 0 and not _idle_monitor_started:
_idle_monitor_started = True
thread = threading.Thread(target=_idle_monitor, daemon=True)
thread.start()
print(f"Idle auto-shutdown enabled: {IDLE_TIMEOUT_MINUTES} minutes")
# History buffer size for charts
HISTORY_SIZE = 500
@dataclass
class SimulationHistory:
"""Stores time series data for visualisation."""
time: deque[float] = field(default_factory=lambda: deque(maxlen=HISTORY_SIZE))
chamber_temp: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
case_temp: deque[float] = field(default_factory=lambda: deque(maxlen=HISTORY_SIZE))
junction_temp: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
output_voltage: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
power_dissipation: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
@dataclass
class TestProgress:
"""Tracks live progress of a running test."""
phase: str = "Initialising"
current_temp: float | None = None
target_temp: float | None = None
temp_index: int = 0
total_temps: int = 0
message: str = ""
started_at: float = field(default_factory=time.time)
@property
def elapsed(self) -> float:
"""Return elapsed time in seconds."""
return time.time() - self.started_at
def start_embedded_server() -> tuple[SimulationServer, threading.Thread]:
"""Start an embedded simulation server in a background thread.
Returns:
Tuple of (server instance, thread running the server).
"""
server = SimulationServer(
ServerConfig(
host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
physics_rate_hz=100.0,
)
)
server_ready = threading.Event()
server_error: list[Exception] = [] # Use list to share error across threads
def run_server() -> None:
"""Run the async server in a new event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(server.start())
# Signal that server is ready
server_ready.set()
# Keep the event loop running
loop.run_forever()
except Exception as e:
server_error.append(e) # Store error for main thread
server_ready.set() # Signal even on error
finally:
try:
loop.run_until_complete(server.stop())
except Exception:
pass
loop.close()
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
# Wait for server to be fully started (up to 5 seconds)
if not server_ready.wait(timeout=5.0):
st.error("Server failed to start within timeout")
# Check if there was an error during startup
if server_error:
st.error(f"Server startup error: {server_error[0]}")
return server, thread
def init_session_state() -> None:
"""Initialise Streamlit session state."""
# Start idle monitor for auto-shutdown
_start_idle_monitor()
_update_activity()
if "server" not in st.session_state:
with st.spinner("Starting simulation server..."):
st.session_state.server, st.session_state.server_thread = start_embedded_server()
# Verify server started correctly
if st.session_state.server.physics_engine is None:
st.error("Failed to start simulation server. Please refresh the page.")
st.stop()
# Register cleanup
def cleanup() -> None:
if hasattr(st.session_state, "server") and st.session_state.server is not None:
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(st.session_state.server.stop())
except Exception:
pass
loop.close()
atexit.register(cleanup)
if "instruments" not in st.session_state:
# Create instruments via HAL using factory
config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
)
st.session_state.instruments = InstrumentFactory.create(config)
# Connect instruments for dashboard use
try:
st.session_state.instruments.chamber.transport.connect()
st.session_state.instruments.psu.transport.connect()
st.session_state.instruments.dmm.transport.connect()
except Exception as e:
st.error(f"Failed to connect to instruments: {e}")
st.stop()
if "repository" not in st.session_state:
# Create test repository (temporary file for dashboard demo)
import os
import tempfile
tmpdir = tempfile.gettempdir()
db_path = os.path.join(tmpdir, "py_dvt_ate_dashboard.db")
st.session_state.repository = SQLiteRepository(db_path)
if "test_runner" not in st.session_state:
st.session_state.test_runner = TestRunner(st.session_state.repository)
if "history" not in st.session_state:
st.session_state.history = SimulationHistory()
if "last_update" not in st.session_state:
st.session_state.last_update = time.time()
if "test_running" not in st.session_state:
st.session_state.test_running = False
if "test_run_id" not in st.session_state:
st.session_state.test_run_id = None
if "test_future" not in st.session_state:
st.session_state.test_future = None
if "test_progress" not in st.session_state:
st.session_state.test_progress = None
def step_simulation() -> None:
"""Advance the simulation based on elapsed real time and multiplier."""
server: SimulationServer = st.session_state.server
instruments: InstrumentSet = st.session_state.instruments
history: SimulationHistory = st.session_state.history
# Get physics engine for visualization (dashboard-specific access)
engine: PhysicsEngine | None = server.physics_engine
if engine is None:
return
# Update timestamp
st.session_state.last_update = time.time()
# Read measurements - use physics engine directly when test is running
# to avoid race conditions with instrument TCP connections
try:
thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
if st.session_state.test_running:
# During test, use physics engine values directly (no HAL access)
chamber_temp = thermal.chamber_temperature
output_voltage = electrical.output_voltage
else:
# When idle, read via HAL to demonstrate instrument communication
chamber_temp = instruments.chamber.get_temperature()
output_voltage = instruments.dmm.measure_dc_voltage()
history.time.append(thermal.timestamp)
history.chamber_temp.append(chamber_temp)
history.case_temp.append(thermal.case_temperature)
history.junction_temp.append(thermal.junction_temperature)
history.output_voltage.append(output_voltage)
history.power_dissipation.append(electrical.power_dissipation)
except (OSError, ValueError):
# Ignore communication errors during updates
pass
def run_test_in_background(
runner: TestRunner,
test: Any,
instruments: InstrumentSet,
config: dict[str, Any],
description: str,
progress: TestProgress,
) -> UUID:
"""Run a test in a background thread, updating progress as it runs."""
temperatures = config.get("temperatures", [])
progress.total_temps = len(temperatures)
progress.phase = "Starting"
progress.message = "Configuring instruments..."
class ProgressTrackingChamber:
"""Wrapper that tracks chamber setpoint changes for progress."""
def __init__(self, chamber: Any) -> None:
# Use object.__setattr__ to avoid triggering __getattr__
object.__setattr__(self, "_chamber", chamber)
object.__setattr__(self, "_progress", progress)
object.__setattr__(self, "_temps", temperatures)
def set_temperature(self, temp: float) -> None:
self._progress.target_temp = temp
self._progress.phase = "Ramping"
if temp in self._temps:
self._progress.temp_index = self._temps.index(temp) + 1
self._progress.message = f"Ramping to {temp:.1f}C"
self._chamber.set_temperature(temp)
def get_temperature(self) -> float:
temp: float = self._chamber.get_temperature()
self._progress.current_temp = temp
return temp
def is_stable(self) -> bool:
stable: bool = self._chamber.is_stable()
if stable:
self._progress.phase = "Measuring"
if self._progress.current_temp is not None:
self._progress.message = f"Stable at {self._progress.current_temp:.1f}C, measuring..."
else:
self._progress.phase = "Stabilising"
if self._progress.target_temp is not None:
self._progress.message = f"Waiting for stability at {self._progress.target_temp:.1f}C"
return stable
def __getattr__(self, name: str) -> Any:
# Delegate all other attributes to the wrapped chamber
return getattr(self._chamber, name)
class WrappedInstruments:
def __init__(self, orig: InstrumentSet) -> None:
self.chamber = ProgressTrackingChamber(orig.chamber)
self.psu = orig.psu
self.dmm = orig.dmm
wrapped = WrappedInstruments(instruments)
run_id = runner.run_test(
test=test,
instruments=wrapped, # type: ignore
config=config,
operator="dashboard_user",
description=description,
)
progress.phase = "Complete"
progress.message = "Test finished"
return run_id
def display_controls() -> None:
"""Display simulation control panel in sidebar."""
st.sidebar.header("Simulation Controls")
test_running = st.session_state.get("test_running", False)
# Show test status warning
if test_running:
st.sidebar.warning("Test in progress - instrument controls disabled")
else:
st.sidebar.info("Physics engine runs automatically. Use controls to adjust.")
# Time multiplier (always available - affects simulation speed)
st.sidebar.subheader("Simulation Speed")
time_multiplier = st.sidebar.select_slider(
"Time Multiplier",
options=[1, 2, 5, 10, 20, 50, 100],
value=st.session_state.get("time_multiplier", 10),
format_func=lambda x: f"{x}x",
key="time_multiplier_slider",
)
st.sidebar.caption(f"1 real second = {time_multiplier} simulation seconds")
# Apply time multiplier to server immediately
if "server" in st.session_state:
st.session_state.server.time_scale = time_multiplier
st.session_state.time_multiplier = time_multiplier
st.sidebar.divider()
# Instrument controls (disabled during test)
st.sidebar.subheader("Instrument Controls")
if test_running:
st.sidebar.caption("Controls disabled while test is running")
with st.sidebar.form("parameter_controls"):
st.markdown("**Thermal Chamber**")
temp_setpoint = st.slider(
"Temperature Setpoint (C)",
min_value=-40.0, max_value=125.0,
value=st.session_state.get("temp_setpoint", 25.0), step=5.0,
disabled=test_running,
)
st.divider()
st.markdown("**Power Supply**")
input_voltage = st.slider(
"Input Voltage (V)",
min_value=0.0, max_value=12.0,
value=st.session_state.get("input_voltage", 5.0), step=0.1,
disabled=test_running,
)
output_enabled = st.toggle(
"Output Enabled",
value=st.session_state.get("output_enabled", False),
disabled=test_running,
)
st.divider()
st.markdown("**Electronic Load**")
load_current = st.slider(
"Load Current (mA)",
min_value=0.0, max_value=500.0,
value=st.session_state.get("load_current", 100.0), step=10.0,
disabled=test_running,
)
submitted = st.form_submit_button(
"Apply Changes",
type="primary",
disabled=test_running,
)
if submitted and not test_running:
st.session_state.temp_setpoint = temp_setpoint
st.session_state.input_voltage = input_voltage
st.session_state.output_enabled = output_enabled
st.session_state.load_current = load_current
st.rerun()
@st.fragment(run_every=0.1)
def simulation_display() -> None:
"""Fragment that displays and updates simulation state."""
_update_activity() # Track that someone is viewing the dashboard
if "server" not in st.session_state:
st.warning("Initializing simulation server...")
return
server: SimulationServer = st.session_state.server
engine: PhysicsEngine | None = server.physics_engine
if engine is None:
st.error("Physics engine not available. Try refreshing the page.")
return
if not server.is_running:
st.warning("Server is not running. Try refreshing the page.")
return
instruments: InstrumentSet = st.session_state.instruments
history: SimulationHistory = st.session_state.history
# Apply control settings to instruments (only when not running a test)
if not st.session_state.test_running:
try:
temp_setpoint = st.session_state.get("temp_setpoint", 25.0)
input_voltage = st.session_state.get("input_voltage", 5.0)
output_enabled = st.session_state.get("output_enabled", False)
load_current_ma = st.session_state.get("load_current", 100.0)
instruments.chamber.set_temperature(temp_setpoint)
instruments.psu.set_voltage(1, input_voltage)
instruments.psu.enable_output(1, output_enabled)
engine.set_load_current(load_current_ma / 1000.0)
except Exception:
pass
# Always update simulation data
step_simulation()
thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
# Show test progress banner if test is running
if st.session_state.test_running and st.session_state.test_progress:
progress: TestProgress = st.session_state.test_progress
if progress.target_temp is not None and progress.current_temp is not None:
st.info(
f"**Test Running:** {progress.phase} | "
f"Step {progress.temp_index}/{progress.total_temps} | "
f"Target: {progress.target_temp:.1f}C | "
f"Current: {progress.current_temp:.1f}C | "
f"Elapsed: {progress.elapsed:.0f}s"
)
else:
st.info(f"**Test Running:** {progress.phase} | {progress.message}")
st.subheader("Current State")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Chamber Temp", f"{thermal.chamber_temperature:.2f} C")
with col2:
st.metric("Case Temp", f"{thermal.case_temperature:.2f} C")
with col3:
st.metric("Junction Temp", f"{thermal.junction_temperature:.2f} C")
with col4:
st.metric("Output Voltage", f"{electrical.output_voltage:.4f} V")
col5, col6, col7, col8 = st.columns(4)
with col5:
st.metric("Input Voltage", f"{electrical.input_voltage:.2f} V")
with col6:
st.metric("Load Current", f"{electrical.load_current * 1000:.1f} mA")
with col7:
st.metric("Power Diss.", f"{electrical.power_dissipation * 1000:.2f} mW")
with col8:
status = "Test Running" if st.session_state.test_running else "Running"
st.metric("Sim Time", f"{engine.simulation_time:.1f} s", delta=status)
st.subheader("Instrument Status")
if st.session_state.test_running:
st.caption("Reading from physics engine (test in progress)")
else:
st.caption("All readings below use the Hardware Abstraction Layer interfaces")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("#### Thermal Chamber")
if st.session_state.test_running:
# Use physics engine during test to avoid race conditions
st.markdown(f"**Temperature:** {thermal.chamber_temperature:.2f} C")
st.markdown(f"**Setpoint:** {thermal.chamber_temperature:.2f} C")
st.markdown("**Stable:** (test running)")
st.markdown("**Status:** Test in progress")
else:
try:
chamber_temp = instruments.chamber.get_temperature()
chamber_setpoint = instruments.chamber.get_setpoint()
chamber_stable = instruments.chamber.is_stable()
st.markdown(f"**Temperature:** {chamber_temp:.2f} C")
st.markdown(f"**Setpoint:** {chamber_setpoint:.2f} C")
st.markdown(f"**Stable:** {'Yes' if chamber_stable else 'No'}")
st.markdown("**Status:** Connected")
except (OSError, ValueError) as e:
st.markdown("**Status:** Disconnected")
st.caption(f"Error: {e}")
with col2:
st.markdown("#### Power Supply")
if st.session_state.test_running:
st.markdown(f"**Voltage:** {electrical.input_voltage:.2f} V")
st.markdown(f"**Current:** {electrical.load_current * 1000:.1f} mA")
st.markdown("**Output:** (test running)")
st.markdown("**Status:** Test in progress")
else:
try:
psu_voltage_setpoint = instruments.psu.get_voltage(1)
psu_voltage_measured = instruments.psu.measure_voltage(1)
psu_current = instruments.psu.measure_current(1)
psu_enabled = instruments.psu.is_output_enabled(1)
st.markdown(f"**Voltage Setpoint:** {psu_voltage_setpoint:.2f} V")
st.markdown(f"**Voltage Measured:** {psu_voltage_measured:.3f} V")
st.markdown(f"**Current:** {psu_current * 1000:.1f} mA")
st.markdown(f"**Output:** {'Enabled' if psu_enabled else 'Disabled'}")
except (OSError, ValueError) as e:
st.markdown("**Status:** Disconnected")
st.caption(f"Error: {e}")
with col3:
st.markdown("#### Multimeter")
if st.session_state.test_running:
st.markdown(f"**DC Voltage:** {electrical.output_voltage:.4f} V")
st.markdown("**Mode:** DC Voltage")
st.markdown("**Status:** Test in progress")
else:
try:
dmm_voltage = instruments.dmm.measure_dc_voltage()
st.markdown(f"**DC Voltage:** {dmm_voltage:.4f} V")
st.markdown("**Mode:** DC Voltage")
st.markdown("**Range:** Auto")
st.markdown("**Status:** Connected")
except (OSError, ValueError) as e:
st.markdown("**Status:** Disconnected")
st.caption(f"Error: {e}")
st.divider()
st.subheader("Temperature History")
if len(history.time) < 2:
st.info("Start the simulation to see temperature data")
else:
chart_data = {
"Time (s)": list(history.time),
"Chamber": list(history.chamber_temp),
"Case": list(history.case_temp),
"Junction": list(history.junction_temp),
}
st.line_chart(chart_data, x="Time (s)", y=["Chamber", "Case", "Junction"],
color=["#1f77b4", "#ff7f0e", "#d62728"])
st.subheader("Self-Heating Demonstration")
delta_t_jc = thermal.junction_temperature - thermal.case_temperature
delta_t_ca = thermal.case_temperature - thermal.chamber_temperature
col1, col2 = st.columns(2)
with col1:
st.markdown("#### Self-Heating Analysis")
st.markdown(f"""
| Parameter | Value |
|-----------|-------|
| Junction-Case Rise (dT_jc) | **{delta_t_jc:.2f} C** |
| Case-Ambient Rise (dT_ca) | **{delta_t_ca:.2f} C** |
| Power Dissipation | {electrical.power_dissipation * 1000:.1f} mW |
| theta_jc (junction-case) | 15 C/W |
| theta_ca (case-ambient) | 5 C/W |
""")
st.markdown("""
**Thermal Coupling:** The junction temperature rises above the case
temperature due to power dissipation. This is governed by:
`T_junction = T_case + P_diss x theta_jc`
Try increasing the load current or input voltage to see
self-heating effects!
""")
with col2:
st.markdown("#### Power Dissipation")
if len(history.time) < 2:
st.info("Start the simulation to see power data")
else:
power_data = {
"Time (s)": list(history.time),
"Power (mW)": [p * 1000 for p in history.power_dissipation],
}
st.line_chart(power_data, x="Time (s)", y="Power (mW)", color="#2ca02c")
@st.fragment(run_every=0.5)
def test_progress_display() -> None:
"""Fragment that displays live test progress."""
if not st.session_state.test_running:
return
future: Future[UUID] | None = st.session_state.test_future
if future is not None and future.done():
try:
run_id = future.result()
st.session_state.test_run_id = run_id
except Exception as e:
st.error(f"Test failed: {e}")
finally:
st.session_state.test_running = False
st.session_state.test_future = None
st.session_state.test_progress = None
st.rerun()
progress: TestProgress | None = st.session_state.test_progress
if progress is None:
st.info("Starting test...")
return
if progress.total_temps > 0:
pct = progress.temp_index / progress.total_temps
st.progress(pct, text=f"Step {progress.temp_index} of {progress.total_temps}")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Phase", progress.phase)
with col2:
st.metric("Target Temp", f"{progress.target_temp:.1f} C" if progress.target_temp else "N/A")
with col3:
st.metric("Current Temp", f"{progress.current_temp:.1f} C" if progress.current_temp else "N/A")
with col4:
st.metric("Elapsed", f"{progress.elapsed:.0f} s")
st.caption(progress.message)
def test_execution_page() -> None:
"""Test execution page for running DVT tests."""
st.header("Test Execution")
st.markdown("Run DVT characterisation tests using the virtual lab bench.")
st.subheader("Select Test")
test_options = {"TempCo (Temperature Coefficient)": TempCoTest()}
selected_test_name = st.selectbox(
"Available Tests",
options=list(test_options.keys()),
disabled=st.session_state.test_running,
)
selected_test = test_options[selected_test_name]
st.info(f"**{selected_test.name}**: {selected_test.description}")
st.subheader("Test Configuration")
col1, col2 = st.columns(2)
with col1:
if selected_test.name == "tempco":
input_voltage = st.number_input(
"Input Voltage (V)", min_value=0.0, max_value=12.0, value=5.0, step=0.1,
disabled=st.session_state.test_running,
)
load_current = st.number_input(
"Load Current (mA)", min_value=0.0, max_value=500.0, value=100.0, step=10.0,
disabled=st.session_state.test_running,
)
settle_time = st.number_input(
"Settle Time (s)", min_value=0.0, max_value=60.0, value=5.0, step=1.0,
disabled=st.session_state.test_running,
)
with col2:
if selected_test.name == "tempco":
temp_min = st.number_input(
"Min Temperature (C)", min_value=-40.0, max_value=125.0, value=-40.0, step=5.0,
disabled=st.session_state.test_running,
)
temp_max = st.number_input(
"Max Temperature (C)", min_value=-40.0, max_value=125.0, value=85.0, step=5.0,
disabled=st.session_state.test_running,
)
temp_step = st.number_input(
"Temperature Step (C)", min_value=1.0, max_value=50.0, value=25.0, step=5.0,
disabled=st.session_state.test_running,
)
if selected_test.name == "tempco":
import numpy as np
temperatures = list(np.arange(temp_min, temp_max + temp_step / 2, temp_step))
st.caption(f"Temperature points: {temperatures}")
test_config = {
"temperatures": temperatures,
"input_voltage": input_voltage,
"load_current": load_current / 1000.0,
"settle_time": settle_time,
}
else:
test_config = {}
st.subheader("Execution")
col1, col2 = st.columns([1, 3])
with col1:
if st.button("Run Test", type="primary", disabled=st.session_state.test_running):
runner: TestRunner = st.session_state.test_runner
instruments: InstrumentSet = st.session_state.instruments
try:
_ = instruments.chamber.get_temperature()
except Exception as e:
st.error(f"Instruments not available: {e}")
st.stop()
progress = TestProgress()
st.session_state.test_progress = progress
st.session_state.test_running = True
st.session_state.test_run_id = None
future = _test_executor.submit(
run_test_in_background,
runner=runner,
test=selected_test,
instruments=instruments,
config=test_config,
description=f"Dashboard execution: {selected_test_name}",
progress=progress,
)
st.session_state.test_future = future
st.rerun()
with col2:
if st.session_state.test_running:
st.warning("Test is running in background - check progress below and Lab Bench tab for live updates")
elif st.session_state.test_run_id:
st.success("Test completed!")
else:
st.caption("Click 'Run Test' to start")
if st.session_state.test_running:
st.divider()
st.subheader("Live Progress")
test_progress_display()
if st.session_state.test_run_id and not st.session_state.test_running:
st.subheader("Test Results")
repository: SQLiteRepository = st.session_state.repository
run = repository.get_run(st.session_state.test_run_id)
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Status", f"{run.status.value.upper()}")
with col2:
if run.completed_at and run.started_at:
duration = (run.completed_at - run.started_at).total_seconds()
st.metric("Duration", f"{duration:.1f} s")
else:
st.metric("Duration", "N/A")
with col3:
results = repository.get_results(st.session_state.test_run_id)
st.metric("Results", len(results))
if run.status.value.upper() == "ERROR":
st.error("Test encountered an error during execution.")
elif run.status.value.upper() == "FAILED":
st.warning("Test completed but one or more results failed to meet specifications.")
if results:
st.markdown("#### Detailed Results")
results_data = []
for result in results:
results_data.append({
"Parameter": result.parameter,
"Value": f"{result.value:.6f}",
"Unit": result.unit or "",
"Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A",
"Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A",
"Status": "PASS" if result.passed else "FAIL",
})
st.table(results_data)
def results_viewer_page() -> None:
"""Results viewer page for browsing historical test results."""
st.header("Results Viewer")
st.markdown("Browse and analyze historical test results.")
repository: SQLiteRepository = st.session_state.repository
all_runs = repository.get_all_runs()
if not all_runs:
st.info("No test results available. Run a test in the Test Execution tab to generate results.")
return
st.subheader("Summary")
col1, col2, col3, col4 = st.columns(4)
status_counts: dict[str, int] = {}
for run in all_runs:
status_counts[run.status.value] = status_counts.get(run.status.value, 0) + 1
with col1:
st.metric("Total Runs", len(all_runs))
with col2:
st.metric("Passed", status_counts.get("passed", 0))
with col3:
st.metric("Failed", status_counts.get("failed", 0))
with col4:
st.metric("Errors", status_counts.get("error", 0))
st.divider()
st.subheader("Filter Results")
col1, col2 = st.columns(2)
with col1:
test_names = list({run.test_name for run in all_runs})
selected_tests = st.multiselect("Test Name", options=["All"] + test_names, default=["All"])
with col2:
statuses = list({run.status.value for run in all_runs})
selected_statuses = st.multiselect("Status", options=["All"] + statuses, default=["All"])
filtered_runs = all_runs
if "All" not in selected_tests:
filtered_runs = [r for r in filtered_runs if r.test_name in selected_tests]
if "All" not in selected_statuses:
filtered_runs = [r for r in filtered_runs if r.status.value in selected_statuses]
st.markdown(f"**Showing {len(filtered_runs)} of {len(all_runs)} runs**")
st.subheader("Test Runs")
if not filtered_runs:
st.info("No test runs match the selected filters.")
return
table_data = []
for run in filtered_runs:
status_icon = {"passed": "PASS", "failed": "FAIL", "error": "ERR"}.get(run.status.value, "?")
duration = "N/A"
if run.completed_at and run.started_at:
duration = f"{(run.completed_at - run.started_at).total_seconds():.1f}s"
table_data.append({
"Select": False,
"ID": run.id[:8],
"Test": run.test_name,
"Status": f"{status_icon}",
"Started": run.started_at.strftime("%Y-%m-%d %H:%M:%S"),
"Duration": duration,
"Operator": run.operator or "N/A",
})
import pandas as pd
df = pd.DataFrame(table_data)
edited_df = st.data_editor(
df,
width="stretch",
hide_index=True,
column_config={
"Select": st.column_config.CheckboxColumn("Select", help="Select a test run to view details", default=False)
},
disabled=["ID", "Test", "Status", "Started", "Duration", "Operator"],
)
selected_rows = edited_df[edited_df["Select"]]
if not selected_rows.empty:
selected_id_short = selected_rows.iloc[0]["ID"]
selected_run = next((r for r in filtered_runs if r.id.startswith(selected_id_short)), None)
if selected_run:
st.divider()
st.subheader(f"Test Run Details: {selected_run.test_name}")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown(f"**Run ID:** `{selected_run.id}`")
st.markdown(f"**Test:** {selected_run.test_name}")
st.markdown(f"**Status:** {selected_run.status.value.upper()}")
with col2:
st.markdown(f"**Started:** {selected_run.started_at.strftime('%Y-%m-%d %H:%M:%S')}")
if selected_run.completed_at:
st.markdown(f"**Completed:** {selected_run.completed_at.strftime('%Y-%m-%d %H:%M:%S')}")
if selected_run.completed_at and selected_run.started_at:
duration_sec = (selected_run.completed_at - selected_run.started_at).total_seconds()
st.markdown(f"**Duration:** {duration_sec:.1f}s")
with col3:
st.markdown(f"**Operator:** {selected_run.operator or 'N/A'}")
if selected_run.description:
st.markdown(f"**Description:** {selected_run.description}")
# PDF Report Generation
st.markdown("### Export Report")
col_btn, col_status = st.columns([1, 3])
with col_btn:
generate_pdf = st.button("Generate PDF Report", type="primary")
if generate_pdf:
try:
from py_dvt_ate.reporting import ReportConfig, ReportGenerator
report_config = ReportConfig(
company_name="py_dvt_ate",
include_charts=True,
chart_dpi=150,
)
generator = ReportGenerator(
repository=repository,
config=report_config,
)
with st.spinner("Generating PDF report..."):
pdf_bytes = generator.generate_bytes(UUID(selected_run.id))
st.session_state.pdf_bytes = pdf_bytes
st.session_state.pdf_filename = f"{selected_run.test_name}_{selected_run.id[:8]}.pdf"
st.success("PDF generated successfully!")
except ImportError:
st.error(
"PDF generation requires additional dependencies. "
"Install with: `pip install py_dvt_ate[reports]`"
)
except Exception as e:
st.error(f"Failed to generate PDF: {e}")
# Show download button if PDF was generated
if "pdf_bytes" in st.session_state and st.session_state.pdf_bytes:
st.download_button(
label="Download PDF",
data=st.session_state.pdf_bytes,
file_name=st.session_state.get("pdf_filename", "report.pdf"),
mime="application/pdf",
)
if selected_run.config_json:
import json
with st.expander("Test Configuration"):
config = json.loads(selected_run.config_json)
st.json(config)
run_uuid = UUID(selected_run.id)
results = repository.get_results(run_uuid)
if results:
st.markdown("### Test Results")
results_table = []
for result in results:
pass_status = "PASS" if result.passed else "FAIL" if result.passed is False else "N/A"
results_table.append({
"Parameter": result.parameter,
"Value": f"{result.value:.6f}",
"Unit": result.unit,
"Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A",
"Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A",
"Status": pass_status,
})
st.table(results_table)
try:
measurements = repository.get_measurements_dataframe(run_uuid)
if measurements is not None and not measurements.empty:
st.markdown("### Measurements")
st.caption(f"Total measurements: {len(measurements)}")
parameters = measurements["parameter"].unique()
for param in parameters:
param_data = measurements[measurements["parameter"] == param]
if not param_data.empty:
st.markdown(f"#### {param}")
import altair as alt
chart = alt.Chart(param_data).mark_line().encode(
x=alt.X("timestamp:Q", title="Time (s)"),
y=alt.Y("value:Q", title=f"Value ({param_data.iloc[0]['unit']})"),
tooltip=["timestamp", "value", "unit"]
).properties(height=300)
st.altair_chart(chart, width="stretch")
except Exception as e:
st.caption(f"No time-series measurement data available ({e})")
def main() -> None:
"""Main entry point for the Streamlit dashboard."""
st.set_page_config(
page_title="py-dvt-ate Virtual Lab Bench",
page_icon="🔬",
layout="wide",
)
st.title("py-dvt-ate Virtual Lab Bench")
st.markdown("""
Interactive demonstration of the Hardware Abstraction Layer (HAL)
controlling a simulated lab bench with thermal chamber, power supply,
and multimeter, showing coupled thermal-electrical behaviour of an
LDO voltage regulator.
""")
init_session_state()
display_controls()
tab1, tab2, tab3 = st.tabs(["Lab Bench", "Test Execution", "Results Viewer"])
with tab1:
simulation_display()
with tab2:
test_execution_page()
with tab3:
results_viewer_page()
if __name__ == "__main__":
main()