Update dashboard to use HAL

This commit is contained in:
2025-10-14 09:39:36 +00:00
parent 42356efce2
commit d1170b7db7

View File

@@ -1,16 +1,22 @@
"""Streamlit dashboard application for physics simulation visualisation.
This module provides an interactive dashboard for visualising the physics
engine directly, demonstrating thermal-electrical coupling in real-time.
simulation through the Hardware Abstraction Layer (HAL), demonstrating
thermal-electrical coupling in real-time using instrument interfaces.
"""
import asyncio
import atexit
import threading
import time
from collections import deque
from dataclasses import dataclass, field
import streamlit as st
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory, InstrumentSet
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
# History buffer size for charts
HISTORY_SIZE = 500
@@ -36,10 +42,69 @@ class SimulationHistory:
)
def start_embedded_server() -> tuple[SimulationServer, threading.Thread]:
"""Start an embedded simulation server in a background thread.
Returns:
Tuple of (server instance, thread running the server).
"""
server = SimulationServer(
ServerConfig(
host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
physics_rate_hz=100.0,
)
)
def run_server() -> None:
"""Run the async server in a new event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(server.start())
# Keep the event loop running
loop.run_forever()
except Exception as e:
st.error(f"Server error: {e}")
finally:
loop.run_until_complete(server.stop())
loop.close()
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
# Wait a moment for server to start
time.sleep(0.5)
return server, thread
def init_session_state() -> None:
"""Initialise Streamlit session state."""
if "engine" not in st.session_state:
st.session_state.engine = PhysicsEngine(update_rate_hz=100.0)
if "server" not in st.session_state:
st.session_state.server, st.session_state.server_thread = start_embedded_server()
# Register cleanup
def cleanup() -> None:
if hasattr(st.session_state, "server") and st.session_state.server is not None:
loop = asyncio.new_event_loop()
loop.run_until_complete(st.session_state.server.stop())
loop.close()
atexit.register(cleanup)
if "instruments" not in st.session_state:
# Create instruments via HAL using factory
config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
)
st.session_state.instruments = InstrumentFactory.create(config)
if "history" not in st.session_state:
st.session_state.history = SimulationHistory()
if "running" not in st.session_state:
@@ -52,44 +117,68 @@ def init_session_state() -> None:
def step_simulation() -> None:
"""Advance the simulation based on elapsed real time and multiplier."""
engine: PhysicsEngine = st.session_state.engine
server: SimulationServer = st.session_state.server
instruments: InstrumentSet = st.session_state.instruments
history: SimulationHistory = st.session_state.history
multiplier: float = st.session_state.get("time_multiplier", 10)
# Calculate how much simulation time to advance
current_time = time.time()
elapsed_real = current_time - st.session_state.last_update
st.session_state.last_update = current_time
# The physics engine runs in the background server thread automatically.
# We just need to read current measurements via HAL and update history.
# Simulation time to advance (capped to prevent huge jumps)
sim_time_to_advance = min(elapsed_real * multiplier, 2.0)
# Get physics engine for visualization (dashboard-specific access)
engine: PhysicsEngine | None = server.physics_engine
if engine is None:
return
# Calculate number of steps needed
steps = int(sim_time_to_advance / engine.dt)
steps = max(1, min(steps, 1000)) # Clamp between 1 and 1000 steps
# Update timestamp
st.session_state.last_update = time.time()
for _ in range(steps):
engine.step()
# Read measurements via HAL
try:
chamber_temp = instruments.chamber.get_temperature()
# For DMM, we need to measure the DUT output voltage
output_voltage = instruments.dmm.measure_dc_voltage()
# Record current state in history
# Get physics state for detailed visualization
# (In production, you'd only have what instruments can measure)
thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
history.time.append(thermal.timestamp)
history.chamber_temp.append(thermal.chamber_temperature)
history.chamber_temp.append(chamber_temp)
history.case_temp.append(thermal.case_temperature)
history.junction_temp.append(thermal.junction_temperature)
history.output_voltage.append(electrical.output_voltage)
history.output_voltage.append(output_voltage)
history.power_dissipation.append(electrical.power_dissipation)
except OSError:
# Ignore communication errors during updates
pass
def sync_engine_from_session_state() -> None:
"""Sync engine parameters from session state (called by fragment)."""
engine: PhysicsEngine = st.session_state.engine
engine.set_chamber_setpoint(st.session_state.get("temp_setpoint", 25.0))
engine.set_input_voltage(st.session_state.get("input_voltage", 5.0))
engine.set_output_enabled(st.session_state.get("output_enabled", False))
engine.set_load_current(st.session_state.get("load_current", 100.0) / 1000.0)
def sync_instruments_from_session_state() -> None:
"""Sync instrument settings from session state via HAL (called by fragment)."""
instruments: InstrumentSet = st.session_state.instruments
try:
# Control thermal chamber via HAL
instruments.chamber.set_temperature(st.session_state.get("temp_setpoint", 25.0))
# Control power supply via HAL
input_voltage = st.session_state.get("input_voltage", 5.0)
output_enabled = st.session_state.get("output_enabled", False)
instruments.psu.set_voltage(1, input_voltage)
instruments.psu.enable_output(1, output_enabled)
# Note: Load current would typically be controlled by an electronic load
# instrument. For simulation, we set it directly on the physics engine.
# In production, you'd have a separate IElectronicLoad interface.
server: SimulationServer = st.session_state.server
engine: PhysicsEngine | None = server.physics_engine
if engine is not None:
load_current_a = st.session_state.get("load_current", 100.0) / 1000.0
engine.set_load_current(load_current_a)
except OSError:
# Ignore communication errors
pass
def display_controls() -> None:
@@ -111,7 +200,26 @@ def display_controls() -> None:
# Reset button
if st.sidebar.button("Reset", use_container_width=True):
st.session_state.engine = PhysicsEngine(update_rate_hz=100.0)
# Stop server and restart
server: SimulationServer = st.session_state.server
loop = asyncio.new_event_loop()
loop.run_until_complete(server.stop())
loop.close()
# Restart server
st.session_state.server, st.session_state.server_thread = start_embedded_server()
# Reconnect instruments
config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=5001,
psu_port=5002,
dmm_port=5003,
)
st.session_state.instruments = InstrumentFactory.create(config)
# Reset history
st.session_state.history = SimulationHistory()
st.session_state.running = False
st.session_state.last_update = time.time()
@@ -180,17 +288,23 @@ def display_controls() -> None:
@st.fragment(run_every=0.1)
def simulation_display() -> None:
"""Fragment that displays and updates simulation state."""
engine: PhysicsEngine = st.session_state.engine
server: SimulationServer = st.session_state.server
instruments: InstrumentSet = st.session_state.instruments
history: SimulationHistory = st.session_state.history
# Sync engine parameters from UI controls
sync_engine_from_session_state()
# Sync instrument settings from UI controls via HAL
sync_instruments_from_session_state()
# Step simulation if running
if st.session_state.running:
step_simulation()
# Get current state
# Get current state from physics engine (for visualization)
engine: PhysicsEngine | None = server.physics_engine
if engine is None:
st.error("Physics engine not available")
return
thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
@@ -223,6 +337,58 @@ def simulation_display() -> None:
delta=f"{status} @ {st.session_state.get('time_multiplier', 10):.0f}x",
)
# Instrument Status Panels (HAL View)
st.subheader("Instrument Status (via HAL)")
st.caption("All readings below use the Hardware Abstraction Layer interfaces")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("#### Thermal Chamber")
try:
chamber_temp = instruments.chamber.get_temperature()
chamber_setpoint = instruments.chamber.get_setpoint()
chamber_stable = instruments.chamber.is_stable()
st.markdown(f"**Temperature:** {chamber_temp:.2f} °C")
st.markdown(f"**Setpoint:** {chamber_setpoint:.2f} °C")
st.markdown(f"**Stable:** {'Yes' if chamber_stable else 'No'}")
st.markdown("**Status:** 🟢 Connected")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
with col2:
st.markdown("#### Power Supply")
try:
psu_voltage_setpoint = instruments.psu.get_voltage(1)
psu_voltage_measured = instruments.psu.measure_voltage(1)
psu_current = instruments.psu.measure_current(1)
psu_enabled = instruments.psu.is_output_enabled(1)
st.markdown(f"**Voltage Setpoint:** {psu_voltage_setpoint:.2f} V")
st.markdown(f"**Voltage Measured:** {psu_voltage_measured:.3f} V")
st.markdown(f"**Current:** {psu_current * 1000:.1f} mA")
st.markdown(f"**Output:** {'🟢 Enabled' if psu_enabled else '🔴 Disabled'}")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
with col3:
st.markdown("#### Multimeter")
try:
dmm_voltage = instruments.dmm.measure_dc_voltage()
st.markdown(f"**DC Voltage:** {dmm_voltage:.4f} V")
st.markdown("**Mode:** DC Voltage")
st.markdown("**Range:** Auto")
st.markdown("**Status:** 🟢 Connected")
except OSError as e:
st.markdown("**Status:** 🔴 Disconnected")
st.caption(f"Error: {e}")
st.divider()
# Temperature chart
st.subheader("Temperature History")
if len(history.time) < 2:
@@ -302,8 +468,10 @@ def main() -> None:
st.title("py-dvt-ate Virtual Lab Bench")
st.markdown(
"""
Interactive physics simulation demonstrating coupled thermal-electrical
behaviour of an LDO voltage regulator.
Interactive demonstration of the Hardware Abstraction Layer (HAL)
controlling a simulated lab bench with thermal chamber, power supply,
and multimeter, showing coupled thermal-electrical behaviour of an
LDO voltage regulator.
"""
)