Update dashboard to use HAL

This commit is contained in:
2025-12-03 16:05:53 +00:00
parent 15f3baaafe
commit 01868361a5

View File

@@ -1,16 +1,22 @@
"""Streamlit dashboard application for physics simulation visualisation. """Streamlit dashboard application for physics simulation visualisation.
This module provides an interactive dashboard for visualising the physics This module provides an interactive dashboard for visualising the physics
engine directly, demonstrating thermal-electrical coupling in real-time. simulation through the Hardware Abstraction Layer (HAL), demonstrating
thermal-electrical coupling in real-time using instrument interfaces.
""" """
import asyncio
import atexit
import threading
import time import time
from collections import deque from collections import deque
from dataclasses import dataclass, field from dataclasses import dataclass, field
import streamlit as st import streamlit as st
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory, InstrumentSet
from py_dvt_ate.simulation.physics.engine import PhysicsEngine from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
# History buffer size for charts # History buffer size for charts
HISTORY_SIZE = 500 HISTORY_SIZE = 500
@@ -36,10 +42,69 @@ class SimulationHistory:
) )
def start_embedded_server() -> tuple[SimulationServer, threading.Thread]:
"""Start an embedded simulation server in a background thread.
Returns:
Tuple of (server instance, thread running the server).
"""
server = SimulationServer(
ServerConfig(
host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
physics_rate_hz=100.0,
)
)
def run_server() -> None:
"""Run the async server in a new event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(server.start())
# Keep the event loop running
loop.run_forever()
except Exception as e:
st.error(f"Server error: {e}")
finally:
loop.run_until_complete(server.stop())
loop.close()
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
# Wait a moment for server to start
time.sleep(0.5)
return server, thread
def init_session_state() -> None: def init_session_state() -> None:
"""Initialise Streamlit session state.""" """Initialise Streamlit session state."""
if "engine" not in st.session_state: if "server" not in st.session_state:
st.session_state.engine = PhysicsEngine(update_rate_hz=100.0) st.session_state.server, st.session_state.server_thread = start_embedded_server()
# Register cleanup
def cleanup() -> None:
if hasattr(st.session_state, "server") and st.session_state.server is not None:
loop = asyncio.new_event_loop()
loop.run_until_complete(st.session_state.server.stop())
loop.close()
atexit.register(cleanup)
if "instruments" not in st.session_state:
# Create instruments via HAL using factory
config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
)
st.session_state.instruments = InstrumentFactory.create(config)
if "history" not in st.session_state: if "history" not in st.session_state:
st.session_state.history = SimulationHistory() st.session_state.history = SimulationHistory()
if "running" not in st.session_state: if "running" not in st.session_state:
@@ -52,44 +117,68 @@ def init_session_state() -> None:
def step_simulation() -> None: def step_simulation() -> None:
"""Advance the simulation based on elapsed real time and multiplier.""" """Advance the simulation based on elapsed real time and multiplier."""
engine: PhysicsEngine = st.session_state.engine server: SimulationServer = st.session_state.server
instruments: InstrumentSet = st.session_state.instruments
history: SimulationHistory = st.session_state.history history: SimulationHistory = st.session_state.history
multiplier: float = st.session_state.get("time_multiplier", 10)
# Calculate how much simulation time to advance # The physics engine runs in the background server thread automatically.
current_time = time.time() # We just need to read current measurements via HAL and update history.
elapsed_real = current_time - st.session_state.last_update
st.session_state.last_update = current_time
# Simulation time to advance (capped to prevent huge jumps) # Get physics engine for visualization (dashboard-specific access)
sim_time_to_advance = min(elapsed_real * multiplier, 2.0) engine: PhysicsEngine | None = server.physics_engine
if engine is None:
return
# Calculate number of steps needed # Update timestamp
steps = int(sim_time_to_advance / engine.dt) st.session_state.last_update = time.time()
steps = max(1, min(steps, 1000)) # Clamp between 1 and 1000 steps
for _ in range(steps): # Read measurements via HAL
engine.step() try:
chamber_temp = instruments.chamber.get_temperature()
# For DMM, we need to measure the DUT output voltage
output_voltage = instruments.dmm.measure_dc_voltage()
# Record current state in history # Get physics state for detailed visualization
thermal = engine.get_thermal_state() # (In production, you'd only have what instruments can measure)
electrical = engine.get_electrical_state() thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
history.time.append(thermal.timestamp) history.time.append(thermal.timestamp)
history.chamber_temp.append(thermal.chamber_temperature) history.chamber_temp.append(chamber_temp)
history.case_temp.append(thermal.case_temperature) history.case_temp.append(thermal.case_temperature)
history.junction_temp.append(thermal.junction_temperature) history.junction_temp.append(thermal.junction_temperature)
history.output_voltage.append(electrical.output_voltage) history.output_voltage.append(output_voltage)
history.power_dissipation.append(electrical.power_dissipation) history.power_dissipation.append(electrical.power_dissipation)
except OSError:
# Ignore communication errors during updates
pass
def sync_engine_from_session_state() -> None: def sync_instruments_from_session_state() -> None:
"""Sync engine parameters from session state (called by fragment).""" """Sync instrument settings from session state via HAL (called by fragment)."""
engine: PhysicsEngine = st.session_state.engine instruments: InstrumentSet = st.session_state.instruments
engine.set_chamber_setpoint(st.session_state.get("temp_setpoint", 25.0))
engine.set_input_voltage(st.session_state.get("input_voltage", 5.0)) try:
engine.set_output_enabled(st.session_state.get("output_enabled", False)) # Control thermal chamber via HAL
engine.set_load_current(st.session_state.get("load_current", 100.0) / 1000.0) instruments.chamber.set_temperature(st.session_state.get("temp_setpoint", 25.0))
# Control power supply via HAL
input_voltage = st.session_state.get("input_voltage", 5.0)
output_enabled = st.session_state.get("output_enabled", False)
instruments.psu.set_voltage(1, input_voltage)
instruments.psu.enable_output(1, output_enabled)
# Note: Load current would typically be controlled by an electronic load
# instrument. For simulation, we set it directly on the physics engine.
# In production, you'd have a separate IElectronicLoad interface.
server: SimulationServer = st.session_state.server
engine: PhysicsEngine | None = server.physics_engine
if engine is not None:
load_current_a = st.session_state.get("load_current", 100.0) / 1000.0
engine.set_load_current(load_current_a)
except OSError:
# Ignore communication errors
pass
def display_controls() -> None: def display_controls() -> None:
@@ -111,7 +200,26 @@ def display_controls() -> None:
# Reset button # Reset button
if st.sidebar.button("Reset", use_container_width=True): if st.sidebar.button("Reset", use_container_width=True):
st.session_state.engine = PhysicsEngine(update_rate_hz=100.0) # Stop server and restart
server: SimulationServer = st.session_state.server
loop = asyncio.new_event_loop()
loop.run_until_complete(server.stop())
loop.close()
# Restart server
st.session_state.server, st.session_state.server_thread = start_embedded_server()
# Reconnect instruments
config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
)
st.session_state.instruments = InstrumentFactory.create(config)
# Reset history
st.session_state.history = SimulationHistory() st.session_state.history = SimulationHistory()
st.session_state.running = False st.session_state.running = False
st.session_state.last_update = time.time() st.session_state.last_update = time.time()
@@ -180,17 +288,23 @@ def display_controls() -> None:
@st.fragment(run_every=0.1) @st.fragment(run_every=0.1)
def simulation_display() -> None: def simulation_display() -> None:
"""Fragment that displays and updates simulation state.""" """Fragment that displays and updates simulation state."""
engine: PhysicsEngine = st.session_state.engine server: SimulationServer = st.session_state.server
instruments: InstrumentSet = st.session_state.instruments
history: SimulationHistory = st.session_state.history history: SimulationHistory = st.session_state.history
# Sync engine parameters from UI controls # Sync instrument settings from UI controls via HAL
sync_engine_from_session_state() sync_instruments_from_session_state()
# Step simulation if running # Step simulation if running
if st.session_state.running: if st.session_state.running:
step_simulation() step_simulation()
# Get current state # Get current state from physics engine (for visualization)
engine: PhysicsEngine | None = server.physics_engine
if engine is None:
st.error("Physics engine not available")
return
thermal = engine.get_thermal_state() thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state() electrical = engine.get_electrical_state()
@@ -223,6 +337,58 @@ def simulation_display() -> None:
delta=f"{status} @ {st.session_state.get('time_multiplier', 10):.0f}x", delta=f"{status} @ {st.session_state.get('time_multiplier', 10):.0f}x",
) )
# Instrument Status Panels (HAL View)
st.subheader("Instrument Status (via HAL)")
st.caption("All readings below use the Hardware Abstraction Layer interfaces")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("#### Thermal Chamber")
try:
chamber_temp = instruments.chamber.get_temperature()
chamber_setpoint = instruments.chamber.get_setpoint()
chamber_stable = instruments.chamber.is_stable()
st.markdown(f"**Temperature:** {chamber_temp:.2f} °C")
st.markdown(f"**Setpoint:** {chamber_setpoint:.2f} °C")
st.markdown(f"**Stable:** {'Yes' if chamber_stable else 'No'}")
st.markdown("**Status:** 🟢 Connected")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
with col2:
st.markdown("#### Power Supply")
try:
psu_voltage_setpoint = instruments.psu.get_voltage(1)
psu_voltage_measured = instruments.psu.measure_voltage(1)
psu_current = instruments.psu.measure_current(1)
psu_enabled = instruments.psu.is_output_enabled(1)
st.markdown(f"**Voltage Setpoint:** {psu_voltage_setpoint:.2f} V")
st.markdown(f"**Voltage Measured:** {psu_voltage_measured:.3f} V")
st.markdown(f"**Current:** {psu_current * 1000:.1f} mA")
st.markdown(f"**Output:** {'🟢 Enabled' if psu_enabled else '🔴 Disabled'}")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
with col3:
st.markdown("#### Multimeter")
try:
dmm_voltage = instruments.dmm.measure_dc_voltage()
st.markdown(f"**DC Voltage:** {dmm_voltage:.4f} V")
st.markdown("**Mode:** DC Voltage")
st.markdown("**Range:** Auto")
st.markdown("**Status:** 🟢 Connected")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
st.divider()
# Temperature chart # Temperature chart
st.subheader("Temperature History") st.subheader("Temperature History")
if len(history.time) < 2: if len(history.time) < 2:
@@ -302,8 +468,10 @@ def main() -> None:
st.title("py-dvt-ate Virtual Lab Bench") st.title("py-dvt-ate Virtual Lab Bench")
st.markdown( st.markdown(
""" """
Interactive physics simulation demonstrating coupled thermal-electrical Interactive demonstration of the Hardware Abstraction Layer (HAL)
behaviour of an LDO voltage regulator. controlling a simulated lab bench with thermal chamber, power supply,
and multimeter, showing coupled thermal-electrical behaviour of an
LDO voltage regulator.
""" """
) )