From 986f368f7590a87f1cbe7f4110b698e475d537dc Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Fri, 21 Nov 2025 12:27:53 +0000 Subject: [PATCH] Remove confusing pause/clear chart buttons from dashboard The physics engine runs continuously, so pausing charts was misleading - users might think the simulation stopped when it didn't. Charts now always update automatically. Also fix Streamlit deprecation warnings by replacing use_container_width with width parameter (will be removed after 2025-12-31). --- src/py_dvt_ate/app/dashboard/app.py | 656 ++++++++++++++-------------- src/py_dvt_ate/simulation/server.py | 30 +- 2 files changed, 368 insertions(+), 318 deletions(-) diff --git a/src/py_dvt_ate/app/dashboard/app.py b/src/py_dvt_ate/app/dashboard/app.py index 1ee7c3b..c95a3da 100644 --- a/src/py_dvt_ate/app/dashboard/app.py +++ b/src/py_dvt_ate/app/dashboard/app.py @@ -10,7 +10,10 @@ import atexit import threading import time from collections import deque +from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass, field +from typing import Any +from uuid import UUID import streamlit as st @@ -21,6 +24,9 @@ from py_dvt_ate.simulation.physics.engine import PhysicsEngine from py_dvt_ate.simulation.server import ServerConfig, SimulationServer from py_dvt_ate.tests.thermal.tempco import TempCoTest +# Thread pool for background test execution +_test_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="test_runner") + # History buffer size for charts HISTORY_SIZE = 500 @@ -45,6 +51,24 @@ class SimulationHistory: ) +@dataclass +class TestProgress: + """Tracks live progress of a running test.""" + + phase: str = "Initialising" + current_temp: float | None = None + target_temp: float | None = None + temp_index: int = 0 + total_temps: int = 0 + message: str = "" + started_at: float = field(default_factory=time.time) + + @property + def elapsed(self) -> float: + """Return elapsed time in seconds.""" + return time.time() - self.started_at + + def start_embedded_server() -> tuple[SimulationServer, threading.Thread]: """Start an embedded simulation server in a background thread. @@ -153,16 +177,16 @@ def init_session_state() -> None: if "history" not in st.session_state: st.session_state.history = SimulationHistory() - if "running" not in st.session_state: - st.session_state.running = True # Start charts by default if "last_update" not in st.session_state: st.session_state.last_update = time.time() if "test_running" not in st.session_state: st.session_state.test_running = False if "test_run_id" not in st.session_state: st.session_state.test_run_id = None - # Note: time_multiplier, temp_setpoint, input_voltage, output_enabled, - # load_current are managed by their respective widgets via keys + if "test_future" not in st.session_state: + st.session_state.test_future = None + if "test_progress" not in st.session_state: + st.session_state.test_progress = None def step_simulation() -> None: @@ -171,9 +195,6 @@ def step_simulation() -> None: instruments: InstrumentSet = st.session_state.instruments history: SimulationHistory = st.session_state.history - # The physics engine runs in the background server thread automatically. - # We just need to read current measurements via HAL and update history. - # Get physics engine for visualization (dashboard-specific access) engine: PhysicsEngine | None = server.physics_engine if engine is None: @@ -182,118 +203,179 @@ def step_simulation() -> None: # Update timestamp st.session_state.last_update = time.time() - # Read measurements via HAL + # Read measurements - use physics engine directly when test is running + # to avoid race conditions with instrument TCP connections try: - chamber_temp = instruments.chamber.get_temperature() - # For DMM, we need to measure the DUT output voltage - output_voltage = instruments.dmm.measure_dc_voltage() - - # Get physics state for detailed visualization - # (In production, you'd only have what instruments can measure) thermal = engine.get_thermal_state() electrical = engine.get_electrical_state() + if st.session_state.test_running: + # During test, use physics engine values directly (no HAL access) + chamber_temp = thermal.chamber_temperature + output_voltage = electrical.output_voltage + else: + # When idle, read via HAL to demonstrate instrument communication + chamber_temp = instruments.chamber.get_temperature() + output_voltage = instruments.dmm.measure_dc_voltage() + history.time.append(thermal.timestamp) history.chamber_temp.append(chamber_temp) history.case_temp.append(thermal.case_temperature) history.junction_temp.append(thermal.junction_temperature) history.output_voltage.append(output_voltage) history.power_dissipation.append(electrical.power_dissipation) - except OSError: + except (OSError, ValueError): # Ignore communication errors during updates pass +def run_test_in_background( + runner: TestRunner, + test: Any, + instruments: InstrumentSet, + config: dict[str, Any], + description: str, + progress: TestProgress, +) -> UUID: + """Run a test in a background thread, updating progress as it runs.""" + temperatures = config.get("temperatures", []) + progress.total_temps = len(temperatures) + progress.phase = "Starting" + progress.message = "Configuring instruments..." + + class ProgressTrackingChamber: + """Wrapper that tracks chamber setpoint changes for progress.""" + + def __init__(self, chamber: Any) -> None: + # Use object.__setattr__ to avoid triggering __getattr__ + object.__setattr__(self, "_chamber", chamber) + object.__setattr__(self, "_progress", progress) + object.__setattr__(self, "_temps", temperatures) + + def set_temperature(self, temp: float) -> None: + self._progress.target_temp = temp + self._progress.phase = "Ramping" + if temp in self._temps: + self._progress.temp_index = self._temps.index(temp) + 1 + self._progress.message = f"Ramping to {temp:.1f}C" + self._chamber.set_temperature(temp) + + def get_temperature(self) -> float: + temp: float = self._chamber.get_temperature() + self._progress.current_temp = temp + return temp + + def is_stable(self) -> bool: + stable: bool = self._chamber.is_stable() + if stable: + self._progress.phase = "Measuring" + if self._progress.current_temp is not None: + self._progress.message = f"Stable at {self._progress.current_temp:.1f}C, measuring..." + else: + self._progress.phase = "Stabilising" + if self._progress.target_temp is not None: + self._progress.message = f"Waiting for stability at {self._progress.target_temp:.1f}C" + return stable + + def __getattr__(self, name: str) -> Any: + # Delegate all other attributes to the wrapped chamber + return getattr(self._chamber, name) + + class WrappedInstruments: + def __init__(self, orig: InstrumentSet) -> None: + self.chamber = ProgressTrackingChamber(orig.chamber) + self.psu = orig.psu + self.dmm = orig.dmm + + wrapped = WrappedInstruments(instruments) + + run_id = runner.run_test( + test=test, + instruments=wrapped, # type: ignore + config=config, + operator="dashboard_user", + description=description, + ) + + progress.phase = "Complete" + progress.message = "Test finished" + + return run_id def display_controls() -> None: """Display simulation control panel in sidebar.""" st.sidebar.header("Simulation Controls") - st.sidebar.info("πŸ”§ Physics engine and charts are running automatically. Use the button below to pause/resume chart updates if needed.") + test_running = st.session_state.get("test_running", False) - # Pause/Resume button - if st.session_state.running: - if st.sidebar.button( - "⏸️ Pause Charts", type="primary", width="stretch" - ): - st.session_state.running = False + # Show test status warning + if test_running: + st.sidebar.warning("Test in progress - instrument controls disabled") else: - if st.sidebar.button( - "▢️ Start Charts", type="primary", width="stretch" - ): - st.session_state.running = True - st.session_state.last_update = time.time() + st.sidebar.info("Physics engine runs automatically. Use controls to adjust.") - # Clear Charts button (just clears history, doesn't restart server) - if st.sidebar.button("Clear Charts", width="stretch"): - st.session_state.history = SimulationHistory() - st.session_state.last_update = time.time() - st.rerun() + # Time multiplier (always available - affects simulation speed) + st.sidebar.subheader("Simulation Speed") + time_multiplier = st.sidebar.select_slider( + "Time Multiplier", + options=[1, 2, 5, 10, 20, 50, 100], + value=st.session_state.get("time_multiplier", 10), + format_func=lambda x: f"{x}x", + key="time_multiplier_slider", + ) + st.sidebar.caption(f"1 real second = {time_multiplier} simulation seconds") + + # Apply time multiplier to server immediately + if "server" in st.session_state: + st.session_state.server.time_scale = time_multiplier + st.session_state.time_multiplier = time_multiplier st.sidebar.divider() - # Parameter Controls - wrapped in form to prevent reruns on every change + # Instrument controls (disabled during test) + st.sidebar.subheader("Instrument Controls") + if test_running: + st.sidebar.caption("Controls disabled while test is running") + with st.sidebar.form("parameter_controls"): - st.subheader("Simulation Parameters") - st.caption("πŸ’‘ Change parameters below, then click Apply to update the simulation") - - # Time multiplier - time_multiplier = st.select_slider( - "Time Multiplier", - options=[1, 2, 5, 10, 20, 50, 100], - value=st.session_state.get("time_multiplier", 10), - format_func=lambda x: f"{x}x", - ) - st.caption(f"1 real second = {time_multiplier} simulation seconds") - - st.divider() - - # Temperature setpoint st.markdown("**Thermal Chamber**") temp_setpoint = st.slider( "Temperature Setpoint (C)", - min_value=-40.0, - max_value=125.0, - value=st.session_state.get("temp_setpoint", 25.0), - step=5.0, + min_value=-40.0, max_value=125.0, + value=st.session_state.get("temp_setpoint", 25.0), step=5.0, + disabled=test_running, ) st.divider() - - # Power supply controls st.markdown("**Power Supply**") input_voltage = st.slider( "Input Voltage (V)", - min_value=0.0, - max_value=12.0, - value=st.session_state.get("input_voltage", 5.0), - step=0.1, + min_value=0.0, max_value=12.0, + value=st.session_state.get("input_voltage", 5.0), step=0.1, + disabled=test_running, ) - output_enabled = st.toggle( "Output Enabled", value=st.session_state.get("output_enabled", False), + disabled=test_running, ) st.divider() - - # Load controls st.markdown("**Electronic Load**") load_current = st.slider( "Load Current (mA)", - min_value=0.0, - max_value=500.0, - value=st.session_state.get("load_current", 100.0), - step=10.0, + min_value=0.0, max_value=500.0, + value=st.session_state.get("load_current", 100.0), step=10.0, + disabled=test_running, ) - # Apply button - submitted = st.form_submit_button("βœ… Apply Changes", type="primary", use_container_width=True) - - if submitted: - # Update session state with new values - st.session_state.time_multiplier = time_multiplier + submitted = st.form_submit_button( + "Apply Changes", + type="primary", + disabled=test_running, + ) + if submitted and not test_running: st.session_state.temp_setpoint = temp_setpoint st.session_state.input_voltage = input_voltage st.session_state.output_enabled = output_enabled @@ -304,52 +386,60 @@ def display_controls() -> None: @st.fragment(run_every=0.1) def simulation_display() -> None: """Fragment that displays and updates simulation state.""" - # Check if server is initialized if "server" not in st.session_state: - st.warning("⏳ Initializing simulation server...") + st.warning("Initializing simulation server...") return server: SimulationServer = st.session_state.server - - # Get current state from physics engine (for visualization) engine: PhysicsEngine | None = server.physics_engine if engine is None: - st.error("❌ Physics engine not available. The server may not have started correctly. Try refreshing the page.") + st.error("Physics engine not available. Try refreshing the page.") return - # Check if server is running if not server.is_running: - st.warning("⚠️ Server is not running. Try refreshing the page.") + st.warning("Server is not running. Try refreshing the page.") return instruments: InstrumentSet = st.session_state.instruments history: SimulationHistory = st.session_state.history - # Apply control settings to instruments - try: - temp_setpoint = st.session_state.get("temp_setpoint", 25.0) - input_voltage = st.session_state.get("input_voltage", 5.0) - output_enabled = st.session_state.get("output_enabled", False) - load_current_ma = st.session_state.get("load_current", 100.0) + # Apply control settings to instruments (only when not running a test) + if not st.session_state.test_running: + try: + temp_setpoint = st.session_state.get("temp_setpoint", 25.0) + input_voltage = st.session_state.get("input_voltage", 5.0) + output_enabled = st.session_state.get("output_enabled", False) + load_current_ma = st.session_state.get("load_current", 100.0) - instruments.chamber.set_temperature(temp_setpoint) - instruments.psu.set_voltage(1, input_voltage) - instruments.psu.enable_output(1, output_enabled) - engine.set_load_current(load_current_ma / 1000.0) - except Exception: - pass # Ignore errors + instruments.chamber.set_temperature(temp_setpoint) + instruments.psu.set_voltage(1, input_voltage) + instruments.psu.enable_output(1, output_enabled) + engine.set_load_current(load_current_ma / 1000.0) + except Exception: + pass - # Step simulation if running - if st.session_state.running: - step_simulation() + # Always update simulation data + step_simulation() thermal = engine.get_thermal_state() electrical = engine.get_electrical_state() - # Current state metrics + # Show test progress banner if test is running + if st.session_state.test_running and st.session_state.test_progress: + progress: TestProgress = st.session_state.test_progress + if progress.target_temp is not None and progress.current_temp is not None: + st.info( + f"**Test Running:** {progress.phase} | " + f"Step {progress.temp_index}/{progress.total_temps} | " + f"Target: {progress.target_temp:.1f}C | " + f"Current: {progress.current_temp:.1f}C | " + f"Elapsed: {progress.elapsed:.0f}s" + ) + else: + st.info(f"**Test Running:** {progress.phase} | {progress.message}") + st.subheader("Current State") col1, col2, col3, col4 = st.columns(4) - with col1: st.metric("Chamber Temp", f"{thermal.chamber_temperature:.2f} C") with col2: @@ -360,7 +450,6 @@ def simulation_display() -> None: st.metric("Output Voltage", f"{electrical.output_voltage:.4f} V") col5, col6, col7, col8 = st.columns(4) - with col5: st.metric("Input Voltage", f"{electrical.input_voltage:.2f} V") with col6: @@ -368,66 +457,77 @@ def simulation_display() -> None: with col7: st.metric("Power Diss.", f"{electrical.power_dissipation * 1000:.2f} mW") with col8: - status = "Running" if st.session_state.running else "Stopped" - st.metric( - "Sim Time", - f"{engine.simulation_time:.1f} s", - delta=f"{status} @ {st.session_state.get('time_multiplier', 10):.0f}x", - ) + status = "Test Running" if st.session_state.test_running else "Running" + st.metric("Sim Time", f"{engine.simulation_time:.1f} s", delta=status) - # Instrument Status Panels (HAL View) - st.subheader("Instrument Status (via HAL)") - st.caption("All readings below use the Hardware Abstraction Layer interfaces") + st.subheader("Instrument Status") + if st.session_state.test_running: + st.caption("Reading from physics engine (test in progress)") + else: + st.caption("All readings below use the Hardware Abstraction Layer interfaces") col1, col2, col3 = st.columns(3) - with col1: st.markdown("#### Thermal Chamber") - try: - chamber_temp = instruments.chamber.get_temperature() - chamber_setpoint = instruments.chamber.get_setpoint() - chamber_stable = instruments.chamber.is_stable() - - st.markdown(f"**Temperature:** {chamber_temp:.2f} Β°C") - st.markdown(f"**Setpoint:** {chamber_setpoint:.2f} Β°C") - st.markdown(f"**Stable:** {'Yes' if chamber_stable else 'No'}") - st.markdown("**Status:** 🟒 Connected") - except OSError as e: - st.markdown("**Status:** πŸ”΄ Disconnected") - st.caption(f"Error: {e}") + if st.session_state.test_running: + # Use physics engine during test to avoid race conditions + st.markdown(f"**Temperature:** {thermal.chamber_temperature:.2f} C") + st.markdown(f"**Setpoint:** {thermal.chamber_temperature:.2f} C") + st.markdown("**Stable:** (test running)") + st.markdown("**Status:** Test in progress") + else: + try: + chamber_temp = instruments.chamber.get_temperature() + chamber_setpoint = instruments.chamber.get_setpoint() + chamber_stable = instruments.chamber.is_stable() + st.markdown(f"**Temperature:** {chamber_temp:.2f} C") + st.markdown(f"**Setpoint:** {chamber_setpoint:.2f} C") + st.markdown(f"**Stable:** {'Yes' if chamber_stable else 'No'}") + st.markdown("**Status:** Connected") + except (OSError, ValueError) as e: + st.markdown("**Status:** Disconnected") + st.caption(f"Error: {e}") with col2: st.markdown("#### Power Supply") - try: - psu_voltage_setpoint = instruments.psu.get_voltage(1) - psu_voltage_measured = instruments.psu.measure_voltage(1) - psu_current = instruments.psu.measure_current(1) - psu_enabled = instruments.psu.is_output_enabled(1) - - st.markdown(f"**Voltage Setpoint:** {psu_voltage_setpoint:.2f} V") - st.markdown(f"**Voltage Measured:** {psu_voltage_measured:.3f} V") - st.markdown(f"**Current:** {psu_current * 1000:.1f} mA") - st.markdown(f"**Output:** {'🟒 Enabled' if psu_enabled else 'πŸ”΄ Disabled'}") - except OSError as e: - st.markdown("**Status:** πŸ”΄ Disconnected") - st.caption(f"Error: {e}") + if st.session_state.test_running: + st.markdown(f"**Voltage:** {electrical.input_voltage:.2f} V") + st.markdown(f"**Current:** {electrical.load_current * 1000:.1f} mA") + st.markdown("**Output:** (test running)") + st.markdown("**Status:** Test in progress") + else: + try: + psu_voltage_setpoint = instruments.psu.get_voltage(1) + psu_voltage_measured = instruments.psu.measure_voltage(1) + psu_current = instruments.psu.measure_current(1) + psu_enabled = instruments.psu.is_output_enabled(1) + st.markdown(f"**Voltage Setpoint:** {psu_voltage_setpoint:.2f} V") + st.markdown(f"**Voltage Measured:** {psu_voltage_measured:.3f} V") + st.markdown(f"**Current:** {psu_current * 1000:.1f} mA") + st.markdown(f"**Output:** {'Enabled' if psu_enabled else 'Disabled'}") + except (OSError, ValueError) as e: + st.markdown("**Status:** Disconnected") + st.caption(f"Error: {e}") with col3: st.markdown("#### Multimeter") - try: - dmm_voltage = instruments.dmm.measure_dc_voltage() - - st.markdown(f"**DC Voltage:** {dmm_voltage:.4f} V") + if st.session_state.test_running: + st.markdown(f"**DC Voltage:** {electrical.output_voltage:.4f} V") st.markdown("**Mode:** DC Voltage") - st.markdown("**Range:** Auto") - st.markdown("**Status:** 🟒 Connected") - except OSError as e: - st.markdown("**Status:** πŸ”΄ Disconnected") - st.caption(f"Error: {e}") + st.markdown("**Status:** Test in progress") + else: + try: + dmm_voltage = instruments.dmm.measure_dc_voltage() + st.markdown(f"**DC Voltage:** {dmm_voltage:.4f} V") + st.markdown("**Mode:** DC Voltage") + st.markdown("**Range:** Auto") + st.markdown("**Status:** Connected") + except (OSError, ValueError) as e: + st.markdown("**Status:** Disconnected") + st.caption(f"Error: {e}") st.divider() - # Temperature chart st.subheader("Temperature History") if len(history.time) < 2: st.info("Start the simulation to see temperature data") @@ -438,25 +538,17 @@ def simulation_display() -> None: "Case": list(history.case_temp), "Junction": list(history.junction_temp), } - st.line_chart( - chart_data, - x="Time (s)", - y=["Chamber", "Case", "Junction"], - color=["#1f77b4", "#ff7f0e", "#d62728"], - ) + st.line_chart(chart_data, x="Time (s)", y=["Chamber", "Case", "Junction"], + color=["#1f77b4", "#ff7f0e", "#d62728"]) - # Self-heating demonstration st.subheader("Self-Heating Demonstration") - delta_t_jc = thermal.junction_temperature - thermal.case_temperature delta_t_ca = thermal.case_temperature - thermal.chamber_temperature col1, col2 = st.columns(2) - with col1: st.markdown("#### Self-Heating Analysis") - st.markdown( - f""" + st.markdown(f""" | Parameter | Value | |-----------|-------| | Junction-Case Rise (dT_jc) | **{delta_t_jc:.2f} C** | @@ -464,10 +556,8 @@ def simulation_display() -> None: | Power Dissipation | {electrical.power_dissipation * 1000:.1f} mW | | theta_jc (junction-case) | 15 C/W | | theta_ca (case-ambient) | 5 C/W | -""" - ) - st.markdown( - """ +""") + st.markdown(""" **Thermal Coupling:** The junction temperature rises above the case temperature due to power dissipation. This is governed by: @@ -475,8 +565,7 @@ temperature due to power dissipation. This is governed by: Try increasing the load current or input voltage to see self-heating effects! -""" - ) +""") with col2: st.markdown("#### Power Dissipation") @@ -487,12 +576,48 @@ self-heating effects! "Time (s)": list(history.time), "Power (mW)": [p * 1000 for p in history.power_dissipation], } - st.line_chart( - power_data, - x="Time (s)", - y="Power (mW)", - color="#2ca02c", - ) + st.line_chart(power_data, x="Time (s)", y="Power (mW)", color="#2ca02c") + + +@st.fragment(run_every=0.5) +def test_progress_display() -> None: + """Fragment that displays live test progress.""" + if not st.session_state.test_running: + return + + future: Future[UUID] | None = st.session_state.test_future + if future is not None and future.done(): + try: + run_id = future.result() + st.session_state.test_run_id = run_id + except Exception as e: + st.error(f"Test failed: {e}") + finally: + st.session_state.test_running = False + st.session_state.test_future = None + st.session_state.test_progress = None + st.rerun() + + progress: TestProgress | None = st.session_state.test_progress + if progress is None: + st.info("Starting test...") + return + + if progress.total_temps > 0: + pct = progress.temp_index / progress.total_temps + st.progress(pct, text=f"Step {progress.temp_index} of {progress.total_temps}") + + col1, col2, col3, col4 = st.columns(4) + with col1: + st.metric("Phase", progress.phase) + with col2: + st.metric("Target Temp", f"{progress.target_temp:.1f} C" if progress.target_temp else "N/A") + with col3: + st.metric("Current Temp", f"{progress.current_temp:.1f} C" if progress.current_temp else "N/A") + with col4: + st.metric("Elapsed", f"{progress.elapsed:.0f} s") + + st.caption(progress.message) def test_execution_page() -> None: @@ -500,83 +625,50 @@ def test_execution_page() -> None: st.header("Test Execution") st.markdown("Run DVT characterisation tests using the virtual lab bench.") - # Test selection st.subheader("Select Test") - test_options = { - "TempCo (Temperature Coefficient)": TempCoTest(), - } + test_options = {"TempCo (Temperature Coefficient)": TempCoTest()} selected_test_name = st.selectbox( "Available Tests", options=list(test_options.keys()), disabled=st.session_state.test_running, ) - selected_test = test_options[selected_test_name] - - # Display test description st.info(f"**{selected_test.name}**: {selected_test.description}") - # Test configuration st.subheader("Test Configuration") - col1, col2 = st.columns(2) with col1: if selected_test.name == "tempco": input_voltage = st.number_input( - "Input Voltage (V)", - min_value=0.0, - max_value=12.0, - value=5.0, - step=0.1, + "Input Voltage (V)", min_value=0.0, max_value=12.0, value=5.0, step=0.1, disabled=st.session_state.test_running, ) load_current = st.number_input( - "Load Current (mA)", - min_value=0.0, - max_value=500.0, - value=100.0, - step=10.0, + "Load Current (mA)", min_value=0.0, max_value=500.0, value=100.0, step=10.0, disabled=st.session_state.test_running, ) settle_time = st.number_input( - "Settle Time (s)", - min_value=0.0, - max_value=60.0, - value=5.0, - step=1.0, + "Settle Time (s)", min_value=0.0, max_value=60.0, value=5.0, step=1.0, disabled=st.session_state.test_running, ) with col2: if selected_test.name == "tempco": temp_min = st.number_input( - "Min Temperature (C)", - min_value=-40.0, - max_value=125.0, - value=-40.0, - step=5.0, + "Min Temperature (C)", min_value=-40.0, max_value=125.0, value=-40.0, step=5.0, disabled=st.session_state.test_running, ) temp_max = st.number_input( - "Max Temperature (C)", - min_value=-40.0, - max_value=125.0, - value=85.0, - step=5.0, + "Max Temperature (C)", min_value=-40.0, max_value=125.0, value=85.0, step=5.0, disabled=st.session_state.test_running, ) temp_step = st.number_input( - "Temperature Step (C)", - min_value=1.0, - max_value=50.0, - value=25.0, - step=5.0, + "Temperature Step (C)", min_value=1.0, max_value=50.0, value=25.0, step=5.0, disabled=st.session_state.test_running, ) - # Generate temperature points if selected_test.name == "tempco": import numpy as np temperatures = list(np.arange(temp_min, temp_max + temp_step / 2, temp_step)) @@ -585,72 +677,57 @@ def test_execution_page() -> None: test_config = { "temperatures": temperatures, "input_voltage": input_voltage, - "load_current": load_current / 1000.0, # Convert mA to A + "load_current": load_current / 1000.0, "settle_time": settle_time, } else: test_config = {} - # Run test button st.subheader("Execution") - col1, col2 = st.columns([1, 3]) with col1: - if st.button( - "Run Test", - type="primary", - width="stretch", - disabled=st.session_state.test_running, - ): + if st.button("Run Test", type="primary", disabled=st.session_state.test_running): + runner: TestRunner = st.session_state.test_runner + instruments: InstrumentSet = st.session_state.instruments + + try: + _ = instruments.chamber.get_temperature() + except Exception as e: + st.error(f"Instruments not available: {e}") + st.stop() + + progress = TestProgress() + st.session_state.test_progress = progress st.session_state.test_running = True st.session_state.test_run_id = None + + future = _test_executor.submit( + run_test_in_background, + runner=runner, + test=selected_test, + instruments=instruments, + config=test_config, + description=f"Dashboard execution: {selected_test_name}", + progress=progress, + ) + st.session_state.test_future = future st.rerun() with col2: if st.session_state.test_running: - st.info("πŸ”„ Test is running...") + st.warning("Test is running in background - check progress below and Lab Bench tab for live updates") elif st.session_state.test_run_id: - st.success("βœ… Test completed!") + st.success("Test completed!") else: st.caption("Click 'Run Test' to start") - # Execute test if running if st.session_state.test_running: - with st.spinner("Running test..."): - try: - runner: TestRunner = st.session_state.test_runner - instruments: InstrumentSet = st.session_state.instruments + st.divider() + st.subheader("Live Progress") + test_progress_display() - # Instruments should already be connected from dashboard startup - # Just verify they're connected - try: - # Quick connectivity check - _ = instruments.chamber.get_temperature() - except Exception as e: - st.error(f"Instruments not available: {e}") - st.session_state.test_running = False - st.stop() - - # Run the test - run_id = runner.run_test( - test=selected_test, - instruments=instruments, - config=test_config, - operator="dashboard_user", - description=f"Dashboard execution: {selected_test_name}", - ) - - st.session_state.test_run_id = run_id - st.session_state.test_running = False - st.rerun() - - except Exception as e: - st.error(f"Test execution failed: {e}") - st.session_state.test_running = False - - # Display results if available - if st.session_state.test_run_id: + if st.session_state.test_run_id and not st.session_state.test_running: st.subheader("Test Results") repository: SQLiteRepository = st.session_state.repository @@ -658,10 +735,7 @@ def test_execution_page() -> None: col1, col2, col3 = st.columns(3) with col1: - status_color = {"PASSED": "🟒", "FAILED": "πŸ”΄", "ERROR": "🟑"}.get( - run.status.value.upper(), "βšͺ" - ) - st.metric("Status", f"{status_color} {run.status.value.upper()}") + st.metric("Status", f"{run.status.value.upper()}") with col2: if run.completed_at and run.started_at: duration = (run.completed_at - run.started_at).total_seconds() @@ -672,13 +746,11 @@ def test_execution_page() -> None: results = repository.get_results(st.session_state.test_run_id) st.metric("Results", len(results)) - # Show error/warning message for non-PASSED tests if run.status.value.upper() == "ERROR": - st.error("⚠️ Test encountered an error during execution. Check the detailed results below or the terminal output for error messages.") + st.error("Test encountered an error during execution.") elif run.status.value.upper() == "FAILED": - st.warning("⚠️ Test completed but one or more results failed to meet specifications.") + st.warning("Test completed but one or more results failed to meet specifications.") - # Show detailed results if results: st.markdown("#### Detailed Results") results_data = [] @@ -689,7 +761,7 @@ def test_execution_page() -> None: "Unit": result.unit or "", "Lower Limit": f"{result.lower_limit:.6f}" if result.lower_limit is not None else "N/A", "Upper Limit": f"{result.upper_limit:.6f}" if result.upper_limit is not None else "N/A", - "Status": "βœ… PASS" if result.passed else "❌ FAIL", + "Status": "PASS" if result.passed else "FAIL", }) st.table(results_data) @@ -700,15 +772,12 @@ def results_viewer_page() -> None: st.markdown("Browse and analyze historical test results.") repository: SQLiteRepository = st.session_state.repository - - # Get all test runs all_runs = repository.get_all_runs() if not all_runs: st.info("No test results available. Run a test in the Test Execution tab to generate results.") return - # Summary statistics st.subheader("Summary") col1, col2, col3, col4 = st.columns(4) @@ -727,29 +796,17 @@ def results_viewer_page() -> None: st.divider() - # Filter controls st.subheader("Filter Results") col1, col2 = st.columns(2) with col1: - # Filter by test name test_names = list({run.test_name for run in all_runs}) - selected_tests = st.multiselect( - "Test Name", - options=["All"] + test_names, - default=["All"], - ) + selected_tests = st.multiselect("Test Name", options=["All"] + test_names, default=["All"]) with col2: - # Filter by status statuses = list({run.status.value for run in all_runs}) - selected_statuses = st.multiselect( - "Status", - options=["All"] + statuses, - default=["All"], - ) + selected_statuses = st.multiselect("Status", options=["All"] + statuses, default=["All"]) - # Apply filters filtered_runs = all_runs if "All" not in selected_tests: filtered_runs = [r for r in filtered_runs if r.test_name in selected_tests] @@ -758,65 +815,50 @@ def results_viewer_page() -> None: st.markdown(f"**Showing {len(filtered_runs)} of {len(all_runs)} runs**") - # Test runs table st.subheader("Test Runs") if not filtered_runs: st.info("No test runs match the selected filters.") return - # Create table data table_data = [] for run in filtered_runs: - status_icon = {"passed": "🟒", "failed": "πŸ”΄", "error": "🟑", "running": "πŸ”΅", "pending": "βšͺ", "skipped": "⚫"}.get( - run.status.value, "βšͺ" - ) - + status_icon = {"passed": "PASS", "failed": "FAIL", "error": "ERR"}.get(run.status.value, "?") duration = "N/A" if run.completed_at and run.started_at: duration = f"{(run.completed_at - run.started_at).total_seconds():.1f}s" - table_data.append({ "Select": False, "ID": run.id[:8], "Test": run.test_name, - "Status": f"{status_icon} {run.status.value.upper()}", + "Status": f"{status_icon}", "Started": run.started_at.strftime("%Y-%m-%d %H:%M:%S"), "Duration": duration, "Operator": run.operator or "N/A", }) - # Display table with selection import pandas as pd df = pd.DataFrame(table_data) - # Use data editor for row selection edited_df = st.data_editor( df, width="stretch", hide_index=True, column_config={ - "Select": st.column_config.CheckboxColumn( - "Select", - help="Select a test run to view details", - default=False, - ) + "Select": st.column_config.CheckboxColumn("Select", help="Select a test run to view details", default=False) }, disabled=["ID", "Test", "Status", "Started", "Duration", "Operator"], ) - # Get selected run selected_rows = edited_df[edited_df["Select"]] if not selected_rows.empty: selected_id_short = selected_rows.iloc[0]["ID"] - # Find full UUID selected_run = next((r for r in filtered_runs if r.id.startswith(selected_id_short)), None) if selected_run: st.divider() st.subheader(f"Test Run Details: {selected_run.test_name}") - # Run metadata col1, col2, col3 = st.columns(3) with col1: st.markdown(f"**Run ID:** `{selected_run.id}`") @@ -834,22 +876,19 @@ def results_viewer_page() -> None: if selected_run.description: st.markdown(f"**Description:** {selected_run.description}") - # Configuration if selected_run.config_json: import json with st.expander("Test Configuration"): config = json.loads(selected_run.config_json) st.json(config) - # Results - from uuid import UUID run_uuid = UUID(selected_run.id) results = repository.get_results(run_uuid) if results: st.markdown("### Test Results") results_table = [] for result in results: - pass_status = "βœ… PASS" if result.passed else "❌ FAIL" if result.passed is False else "N/A" + pass_status = "PASS" if result.passed else "FAIL" if result.passed is False else "N/A" results_table.append({ "Parameter": result.parameter, "Value": f"{result.value:.6f}", @@ -860,32 +899,23 @@ def results_viewer_page() -> None: }) st.table(results_table) - # Measurements try: measurements = repository.get_measurements_dataframe(run_uuid) if measurements is not None and not measurements.empty: st.markdown("### Measurements") - - # Show measurement count st.caption(f"Total measurements: {len(measurements)}") - # Plot measurements by parameter parameters = measurements["parameter"].unique() - for param in parameters: param_data = measurements[measurements["parameter"] == param] if not param_data.empty: st.markdown(f"#### {param}") - - # Create chart import altair as alt chart = alt.Chart(param_data).mark_line().encode( x=alt.X("timestamp:Q", title="Time (s)"), y=alt.Y("value:Q", title=f"Value ({param_data.iloc[0]['unit']})"), tooltip=["timestamp", "value", "unit"] - ).properties( - height=300 - ) + ).properties(height=300) st.altair_chart(chart, width="stretch") except Exception as e: st.caption(f"No time-series measurement data available ({e})") @@ -900,25 +930,19 @@ def main() -> None: ) st.title("py-dvt-ate Virtual Lab Bench") - st.markdown( - """ + st.markdown(""" Interactive demonstration of the Hardware Abstraction Layer (HAL) controlling a simulated lab bench with thermal chamber, power supply, and multimeter, showing coupled thermal-electrical behaviour of an LDO voltage regulator. - """ - ) + """) init_session_state() - - # Sidebar controls (static - doesn't need fragment) display_controls() - # Create tabs for different views - tab1, tab2, tab3 = st.tabs(["πŸ”¬ Lab Bench", "πŸ§ͺ Test Execution", "πŸ“Š Results Viewer"]) + tab1, tab2, tab3 = st.tabs(["Lab Bench", "Test Execution", "Results Viewer"]) with tab1: - # Dynamic simulation display (uses fragment for smooth updates) simulation_display() with tab2: diff --git a/src/py_dvt_ate/simulation/server.py b/src/py_dvt_ate/simulation/server.py index f560ce1..02080c7 100644 --- a/src/py_dvt_ate/simulation/server.py +++ b/src/py_dvt_ate/simulation/server.py @@ -57,17 +57,39 @@ class SimulationServer: self._instrument_server: InstrumentServer | None = None self._physics_task: asyncio.Task[None] | None = None self._running = False + self._paused = False # Pause physics simulation + self._time_scale = 1.0 # Simulation time multiplier @property def is_running(self) -> bool: """Check if server is currently running.""" return self._running + @property + def paused(self) -> bool: + """Check if physics simulation is paused.""" + return self._paused + + @paused.setter + def paused(self, value: bool) -> None: + """Pause or resume the physics simulation.""" + self._paused = value + @property def physics_engine(self) -> PhysicsEngine | None: """Get the physics engine instance.""" return self._physics_engine + @property + def time_scale(self) -> float: + """Get the current time scale multiplier.""" + return self._time_scale + + @time_scale.setter + def time_scale(self, value: float) -> None: + """Set the time scale multiplier (e.g., 10.0 = 10x faster).""" + self._time_scale = max(0.1, min(value, 1000.0)) + def _setup(self) -> None: """Create and wire up all components.""" # Create physics engine @@ -101,8 +123,12 @@ class SimulationServer: dt = self._physics_engine.dt while self._running: - self._physics_engine.step() - # Sleep for the physics timestep + if not self._paused: + # Step physics multiple times based on time scale + steps_per_tick = max(1, int(self._time_scale)) + for _ in range(steps_per_tick): + self._physics_engine.step() + # Sleep for the physics timestep (wall clock time) await asyncio.sleep(dt) async def start(self) -> None: