Files
py-dvt-ate/src/py_dvt_ate/simulation/server.py
Kai Chappell e896cbab64
All checks were successful
CI / Lint (push) Successful in 4s
CI / Type Check (push) Successful in 18s
CI / Test (push) Successful in 15m49s
CI / Release (push) Has been skipped
Remove confusing pause/clear chart buttons from dashboard
The physics engine runs continuously, so pausing charts was misleading -
users might think the simulation stopped when it didn't. Charts now
always update automatically.

Also fix Streamlit deprecation warnings by replacing use_container_width
with width parameter (will be removed after 2025-12-31).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 21:21:20 +00:00

267 lines
8.1 KiB
Python

"""Simulation server entry point.
This module provides the main entry point for running the simulation server
with all virtual instruments wired to a shared physics engine.
"""
from __future__ import annotations
import asyncio
import logging
import signal
from dataclasses import dataclass
from py_dvt_ate.instruments.transport import InstrumentServer
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
logger = logging.getLogger(__name__)
@dataclass
class ServerConfig:
"""Configuration for the simulation server.
Attributes:
host: Host address to bind to.
chamber_port: Port for thermal chamber instrument.
psu_port: Port for power supply instrument.
dmm_port: Port for multimeter instrument.
physics_rate_hz: Physics engine update rate in Hz.
"""
host: str = "127.0.0.1"
chamber_port: int = 5000
psu_port: int = 5001
dmm_port: int = 5002
physics_rate_hz: float = 100.0
class SimulationServer:
"""Complete simulation server with physics engine and instruments.
Creates a physics engine and wires it to all virtual instruments,
then exposes them over TCP for client access.
"""
def __init__(self, config: ServerConfig | None = None) -> None:
"""Initialise the simulation server.
Args:
config: Server configuration. Uses defaults if not provided.
"""
self._config = config or ServerConfig()
self._physics_engine: PhysicsEngine | None = None
self._instrument_server: InstrumentServer | None = None
self._physics_task: asyncio.Task[None] | None = None
self._running = False
self._paused = False # Pause physics simulation
self._time_scale = 1.0 # Simulation time multiplier
@property
def is_running(self) -> bool:
"""Check if server is currently running."""
return self._running
@property
def paused(self) -> bool:
"""Check if physics simulation is paused."""
return self._paused
@paused.setter
def paused(self, value: bool) -> None:
"""Pause or resume the physics simulation."""
self._paused = value
@property
def physics_engine(self) -> PhysicsEngine | None:
"""Get the physics engine instance."""
return self._physics_engine
@property
def time_scale(self) -> float:
"""Get the current time scale multiplier."""
return self._time_scale
@time_scale.setter
def time_scale(self, value: float) -> None:
"""Set the time scale multiplier (e.g., 10.0 = 10x faster)."""
self._time_scale = max(0.1, min(value, 1000.0))
def _setup(self) -> None:
"""Create and wire up all components."""
# Create physics engine
self._physics_engine = PhysicsEngine(
update_rate_hz=self._config.physics_rate_hz
)
# Create instruments connected to physics engine
chamber = ThermalChamberSim(self._physics_engine)
psu = PowerSupplySim(self._physics_engine)
dmm = MultimeterSim(self._physics_engine)
# Create TCP server and register instruments
self._instrument_server = InstrumentServer(host=self._config.host)
self._instrument_server.register_instrument(self._config.chamber_port, chamber)
self._instrument_server.register_instrument(self._config.psu_port, psu)
self._instrument_server.register_instrument(self._config.dmm_port, dmm)
logger.info(
"Simulation server configured: chamber=%d, psu=%d, dmm=%d",
self._config.chamber_port,
self._config.psu_port,
self._config.dmm_port,
)
async def _run_physics(self) -> None:
"""Run the physics engine simulation loop."""
if self._physics_engine is None:
return
dt = self._physics_engine.dt
while self._running:
if not self._paused:
# Step physics multiple times based on time scale
steps_per_tick = max(1, int(self._time_scale))
for _ in range(steps_per_tick):
self._physics_engine.step()
# Sleep for the physics timestep (wall clock time)
await asyncio.sleep(dt)
async def start(self) -> None:
"""Start the simulation server.
Sets up all components and starts the TCP server and physics engine.
Raises:
RuntimeError: If server is already running.
"""
if self._running:
raise RuntimeError("Server is already running")
self._setup()
self._running = True
# Start TCP server
if self._instrument_server is not None:
await self._instrument_server.start()
# Start physics engine loop
self._physics_task = asyncio.create_task(self._run_physics())
logger.info("Simulation server started")
async def stop(self) -> None:
"""Stop the simulation server."""
if not self._running:
return
self._running = False
# Cancel physics loop
if self._physics_task is not None:
self._physics_task.cancel()
try:
await self._physics_task
except asyncio.CancelledError:
pass
self._physics_task = None
# Stop TCP server
if self._instrument_server is not None:
await self._instrument_server.stop()
self._instrument_server = None
self._physics_engine = None
logger.info("Simulation server stopped")
async def serve_forever(self) -> None:
"""Start the server and run until cancelled."""
await self.start()
try:
# Wait for the physics task (which runs until cancelled)
if self._physics_task is not None:
await self._physics_task
except asyncio.CancelledError:
pass
finally:
await self.stop()
async def run_server(config: ServerConfig | None = None) -> None:
"""Run the simulation server with signal handling.
This is the main entry point for running the server. It sets up
signal handlers for graceful shutdown.
Args:
config: Server configuration. Uses defaults if not provided.
"""
server = SimulationServer(config)
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
def signal_handler() -> None:
logger.info("Shutdown signal received")
stop_event.set()
# Register signal handlers (Unix-style, may not work on all Windows)
try:
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, signal_handler)
except NotImplementedError:
# Windows doesn't support add_signal_handler
pass
try:
await server.start()
logger.info("Simulation server running. Press Ctrl+C to stop.")
# Wait for stop signal
await stop_event.wait()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
finally:
await server.stop()
def main(
host: str = "127.0.0.1",
chamber_port: int = 5000,
psu_port: int = 5001,
dmm_port: int = 5002,
physics_rate: float = 100.0,
) -> None:
"""Run the simulation server from command line.
Args:
host: Host address to bind to.
chamber_port: Port for thermal chamber.
psu_port: Port for power supply.
dmm_port: Port for multimeter.
physics_rate: Physics engine update rate in Hz.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
config = ServerConfig(
host=host,
chamber_port=chamber_port,
psu_port=psu_port,
dmm_port=dmm_port,
physics_rate_hz=physics_rate,
)
asyncio.run(run_server(config))
if __name__ == "__main__":
main()