From d1170b7db72f165fe466deccc650512fa1d4cfe8 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Tue, 14 Oct 2025 09:39:36 +0000 Subject: [PATCH] Update dashboard to use HAL --- src/py_dvt_ate/app/dashboard/app.py | 246 +++++++++++++++++++++++----- 1 file changed, 207 insertions(+), 39 deletions(-) diff --git a/src/py_dvt_ate/app/dashboard/app.py b/src/py_dvt_ate/app/dashboard/app.py index 49fb5a1..3b36e1c 100644 --- a/src/py_dvt_ate/app/dashboard/app.py +++ b/src/py_dvt_ate/app/dashboard/app.py @@ -1,16 +1,22 @@ """Streamlit dashboard application for physics simulation visualisation. This module provides an interactive dashboard for visualising the physics -engine directly, demonstrating thermal-electrical coupling in real-time. +simulation through the Hardware Abstraction Layer (HAL), demonstrating +thermal-electrical coupling in real-time using instrument interfaces. """ +import asyncio +import atexit +import threading import time from collections import deque from dataclasses import dataclass, field import streamlit as st +from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory, InstrumentSet from py_dvt_ate.simulation.physics.engine import PhysicsEngine +from py_dvt_ate.simulation.server import ServerConfig, SimulationServer # History buffer size for charts HISTORY_SIZE = 500 @@ -36,10 +42,69 @@ class SimulationHistory: ) +def start_embedded_server() -> tuple[SimulationServer, threading.Thread]: + """Start an embedded simulation server in a background thread. + + Returns: + Tuple of (server instance, thread running the server). + """ + server = SimulationServer( + ServerConfig( + host="127.0.0.1", + chamber_port=5001, + psu_port=5002, + dmm_port=5003, + physics_rate_hz=100.0, + ) + ) + + def run_server() -> None: + """Run the async server in a new event loop.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(server.start()) + # Keep the event loop running + loop.run_forever() + except Exception as e: + st.error(f"Server error: {e}") + finally: + loop.run_until_complete(server.stop()) + loop.close() + + thread = threading.Thread(target=run_server, daemon=True) + thread.start() + + # Wait a moment for server to start + time.sleep(0.5) + + return server, thread + + def init_session_state() -> None: """Initialise Streamlit session state.""" - if "engine" not in st.session_state: - st.session_state.engine = PhysicsEngine(update_rate_hz=100.0) + if "server" not in st.session_state: + st.session_state.server, st.session_state.server_thread = start_embedded_server() + + # Register cleanup + def cleanup() -> None: + if hasattr(st.session_state, "server") and st.session_state.server is not None: + loop = asyncio.new_event_loop() + loop.run_until_complete(st.session_state.server.stop()) + loop.close() + atexit.register(cleanup) + + if "instruments" not in st.session_state: + # Create instruments via HAL using factory + config = InstrumentConfig( + backend="simulator", + simulator_host="127.0.0.1", + chamber_port=5001, + psu_port=5002, + dmm_port=5003, + ) + st.session_state.instruments = InstrumentFactory.create(config) + if "history" not in st.session_state: st.session_state.history = SimulationHistory() if "running" not in st.session_state: @@ -52,44 +117,68 @@ def init_session_state() -> None: def step_simulation() -> None: """Advance the simulation based on elapsed real time and multiplier.""" - engine: PhysicsEngine = st.session_state.engine + server: SimulationServer = st.session_state.server + instruments: InstrumentSet = st.session_state.instruments history: SimulationHistory = st.session_state.history - multiplier: float = st.session_state.get("time_multiplier", 10) - # Calculate how much simulation time to advance - current_time = time.time() - elapsed_real = current_time - st.session_state.last_update - st.session_state.last_update = current_time + # The physics engine runs in the background server thread automatically. + # We just need to read current measurements via HAL and update history. - # Simulation time to advance (capped to prevent huge jumps) - sim_time_to_advance = min(elapsed_real * multiplier, 2.0) + # Get physics engine for visualization (dashboard-specific access) + engine: PhysicsEngine | None = server.physics_engine + if engine is None: + return - # Calculate number of steps needed - steps = int(sim_time_to_advance / engine.dt) - steps = max(1, min(steps, 1000)) # Clamp between 1 and 1000 steps + # Update timestamp + st.session_state.last_update = time.time() - for _ in range(steps): - engine.step() + # Read measurements via HAL + try: + chamber_temp = instruments.chamber.get_temperature() + # For DMM, we need to measure the DUT output voltage + output_voltage = instruments.dmm.measure_dc_voltage() - # Record current state in history - thermal = engine.get_thermal_state() - electrical = engine.get_electrical_state() + # Get physics state for detailed visualization + # (In production, you'd only have what instruments can measure) + thermal = engine.get_thermal_state() + electrical = engine.get_electrical_state() - history.time.append(thermal.timestamp) - history.chamber_temp.append(thermal.chamber_temperature) - history.case_temp.append(thermal.case_temperature) - history.junction_temp.append(thermal.junction_temperature) - history.output_voltage.append(electrical.output_voltage) - history.power_dissipation.append(electrical.power_dissipation) + history.time.append(thermal.timestamp) + history.chamber_temp.append(chamber_temp) + history.case_temp.append(thermal.case_temperature) + history.junction_temp.append(thermal.junction_temperature) + history.output_voltage.append(output_voltage) + history.power_dissipation.append(electrical.power_dissipation) + except OSError: + # Ignore communication errors during updates + pass -def sync_engine_from_session_state() -> None: - """Sync engine parameters from session state (called by fragment).""" - engine: PhysicsEngine = st.session_state.engine - engine.set_chamber_setpoint(st.session_state.get("temp_setpoint", 25.0)) - engine.set_input_voltage(st.session_state.get("input_voltage", 5.0)) - engine.set_output_enabled(st.session_state.get("output_enabled", False)) - engine.set_load_current(st.session_state.get("load_current", 100.0) / 1000.0) +def sync_instruments_from_session_state() -> None: + """Sync instrument settings from session state via HAL (called by fragment).""" + instruments: InstrumentSet = st.session_state.instruments + + try: + # Control thermal chamber via HAL + instruments.chamber.set_temperature(st.session_state.get("temp_setpoint", 25.0)) + + # Control power supply via HAL + input_voltage = st.session_state.get("input_voltage", 5.0) + output_enabled = st.session_state.get("output_enabled", False) + instruments.psu.set_voltage(1, input_voltage) + instruments.psu.enable_output(1, output_enabled) + + # Note: Load current would typically be controlled by an electronic load + # instrument. For simulation, we set it directly on the physics engine. + # In production, you'd have a separate IElectronicLoad interface. + server: SimulationServer = st.session_state.server + engine: PhysicsEngine | None = server.physics_engine + if engine is not None: + load_current_a = st.session_state.get("load_current", 100.0) / 1000.0 + engine.set_load_current(load_current_a) + except OSError: + # Ignore communication errors + pass def display_controls() -> None: @@ -111,7 +200,26 @@ def display_controls() -> None: # Reset button if st.sidebar.button("Reset", use_container_width=True): - st.session_state.engine = PhysicsEngine(update_rate_hz=100.0) + # Stop server and restart + server: SimulationServer = st.session_state.server + loop = asyncio.new_event_loop() + loop.run_until_complete(server.stop()) + loop.close() + + # Restart server + st.session_state.server, st.session_state.server_thread = start_embedded_server() + + # Reconnect instruments + config = InstrumentConfig( + backend="simulator", + simulator_host="127.0.0.1", + chamber_port=5001, + psu_port=5002, + dmm_port=5003, + ) + st.session_state.instruments = InstrumentFactory.create(config) + + # Reset history st.session_state.history = SimulationHistory() st.session_state.running = False st.session_state.last_update = time.time() @@ -180,17 +288,23 @@ def display_controls() -> None: @st.fragment(run_every=0.1) def simulation_display() -> None: """Fragment that displays and updates simulation state.""" - engine: PhysicsEngine = st.session_state.engine + server: SimulationServer = st.session_state.server + instruments: InstrumentSet = st.session_state.instruments history: SimulationHistory = st.session_state.history - # Sync engine parameters from UI controls - sync_engine_from_session_state() + # Sync instrument settings from UI controls via HAL + sync_instruments_from_session_state() # Step simulation if running if st.session_state.running: step_simulation() - # Get current state + # Get current state from physics engine (for visualization) + engine: PhysicsEngine | None = server.physics_engine + if engine is None: + st.error("Physics engine not available") + return + thermal = engine.get_thermal_state() electrical = engine.get_electrical_state() @@ -223,6 +337,58 @@ def simulation_display() -> None: delta=f"{status} @ {st.session_state.get('time_multiplier', 10):.0f}x", ) + # Instrument Status Panels (HAL View) + st.subheader("Instrument Status (via HAL)") + st.caption("All readings below use the Hardware Abstraction Layer interfaces") + + col1, col2, col3 = st.columns(3) + + with col1: + st.markdown("#### Thermal Chamber") + try: + chamber_temp = instruments.chamber.get_temperature() + chamber_setpoint = instruments.chamber.get_setpoint() + chamber_stable = instruments.chamber.is_stable() + + st.markdown(f"**Temperature:** {chamber_temp:.2f} °C") + st.markdown(f"**Setpoint:** {chamber_setpoint:.2f} °C") + st.markdown(f"**Stable:** {'Yes' if chamber_stable else 'No'}") + st.markdown("**Status:** 🟢 Connected") + except OSError as e: + st.markdown("**Status:** 🔴 Disconnected") + st.caption(f"Error: {e}") + + with col2: + st.markdown("#### Power Supply") + try: + psu_voltage_setpoint = instruments.psu.get_voltage(1) + psu_voltage_measured = instruments.psu.measure_voltage(1) + psu_current = instruments.psu.measure_current(1) + psu_enabled = instruments.psu.is_output_enabled(1) + + st.markdown(f"**Voltage Setpoint:** {psu_voltage_setpoint:.2f} V") + st.markdown(f"**Voltage Measured:** {psu_voltage_measured:.3f} V") + st.markdown(f"**Current:** {psu_current * 1000:.1f} mA") + st.markdown(f"**Output:** {'🟢 Enabled' if psu_enabled else '🔴 Disabled'}") + except OSError as e: + st.markdown("**Status:** 🔴 Disconnected") + st.caption(f"Error: {e}") + + with col3: + st.markdown("#### Multimeter") + try: + dmm_voltage = instruments.dmm.measure_dc_voltage() + + st.markdown(f"**DC Voltage:** {dmm_voltage:.4f} V") + st.markdown("**Mode:** DC Voltage") + st.markdown("**Range:** Auto") + st.markdown("**Status:** 🟢 Connected") + except OSError as e: + st.markdown("**Status:** 🔴 Disconnected") + st.caption(f"Error: {e}") + + st.divider() + # Temperature chart st.subheader("Temperature History") if len(history.time) < 2: @@ -302,8 +468,10 @@ def main() -> None: st.title("py-dvt-ate Virtual Lab Bench") st.markdown( """ - Interactive physics simulation demonstrating coupled thermal-electrical - behaviour of an LDO voltage regulator. + Interactive demonstration of the Hardware Abstraction Layer (HAL) + controlling a simulated lab bench with thermal chamber, power supply, + and multimeter, showing coupled thermal-electrical behaviour of an + LDO voltage regulator. """ )