Add simulation server entry point

Create SimulationServer that wires physics engine to all virtual
instruments and exposes them over TCP. Add 'serve' CLI command to
start the server with configurable ports and physics rate.
This commit is contained in:
2025-05-30 19:31:01 +00:00
parent eb2df949cf
commit fe09ff46bf
3 changed files with 289 additions and 1 deletions

View File

@@ -36,5 +36,52 @@ def main(
"""py-dvt-ate: Coupled Physics DVT Simulation Platform."""
@app.command()
def serve(
host: Annotated[
str,
typer.Option("--host", "-h", help="Host address to bind to."),
] = "127.0.0.1",
chamber_port: Annotated[
int,
typer.Option("--chamber-port", help="Port for thermal chamber instrument."),
] = 5000,
psu_port: Annotated[
int,
typer.Option("--psu-port", help="Port for power supply instrument."),
] = 5001,
dmm_port: Annotated[
int,
typer.Option("--dmm-port", help="Port for multimeter instrument."),
] = 5002,
physics_rate: Annotated[
float,
typer.Option("--physics-rate", help="Physics engine update rate in Hz."),
] = 100.0,
) -> None:
"""Start the simulation server with virtual instruments.
Runs a TCP server hosting virtual SCPI instruments connected to a
shared physics engine. Each instrument listens on its own port.
"""
from py_dvt_ate.simulation.server import main as run_server
typer.echo(f"Starting simulation server on {host}...")
typer.echo(f" Thermal chamber: port {chamber_port}")
typer.echo(f" Power supply: port {psu_port}")
typer.echo(f" Multimeter: port {dmm_port}")
typer.echo(f" Physics rate: {physics_rate} Hz")
typer.echo("")
typer.echo("Press Ctrl+C to stop.")
run_server(
host=host,
chamber_port=chamber_port,
psu_port=psu_port,
dmm_port=dmm_port,
physics_rate=physics_rate,
)
if __name__ == "__main__":
app()

View File

@@ -4,6 +4,7 @@ Provides virtual instruments backed by a coupled thermal-electrical
physics engine. Used for development and testing without real hardware.
"""
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.simulation.tcp_server import InstrumentServer
__all__ = ["InstrumentServer"]
__all__ = ["InstrumentServer", "ServerConfig", "SimulationServer"]

View File

@@ -0,0 +1,240 @@
"""Simulation server entry point.
This module provides the main entry point for running the simulation server
with all virtual instruments wired to a shared physics engine.
"""
from __future__ import annotations
import asyncio
import logging
import signal
from dataclasses import dataclass
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.tcp_server import InstrumentServer
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
logger = logging.getLogger(__name__)
@dataclass
class ServerConfig:
"""Configuration for the simulation server.
Attributes:
host: Host address to bind to.
chamber_port: Port for thermal chamber instrument.
psu_port: Port for power supply instrument.
dmm_port: Port for multimeter instrument.
physics_rate_hz: Physics engine update rate in Hz.
"""
host: str = "127.0.0.1"
chamber_port: int = 5000
psu_port: int = 5001
dmm_port: int = 5002
physics_rate_hz: float = 100.0
class SimulationServer:
"""Complete simulation server with physics engine and instruments.
Creates a physics engine and wires it to all virtual instruments,
then exposes them over TCP for client access.
"""
def __init__(self, config: ServerConfig | None = None) -> None:
"""Initialise the simulation server.
Args:
config: Server configuration. Uses defaults if not provided.
"""
self._config = config or ServerConfig()
self._physics_engine: PhysicsEngine | None = None
self._instrument_server: InstrumentServer | None = None
self._physics_task: asyncio.Task[None] | None = None
self._running = False
@property
def is_running(self) -> bool:
"""Check if server is currently running."""
return self._running
@property
def physics_engine(self) -> PhysicsEngine | None:
"""Get the physics engine instance."""
return self._physics_engine
def _setup(self) -> None:
"""Create and wire up all components."""
# Create physics engine
self._physics_engine = PhysicsEngine(
update_rate_hz=self._config.physics_rate_hz
)
# Create instruments connected to physics engine
chamber = ThermalChamberSim(self._physics_engine)
psu = PowerSupplySim(self._physics_engine)
dmm = MultimeterSim(self._physics_engine)
# Create TCP server and register instruments
self._instrument_server = InstrumentServer(host=self._config.host)
self._instrument_server.register_instrument(self._config.chamber_port, chamber)
self._instrument_server.register_instrument(self._config.psu_port, psu)
self._instrument_server.register_instrument(self._config.dmm_port, dmm)
logger.info(
"Simulation server configured: chamber=%d, psu=%d, dmm=%d",
self._config.chamber_port,
self._config.psu_port,
self._config.dmm_port,
)
async def _run_physics(self) -> None:
"""Run the physics engine simulation loop."""
if self._physics_engine is None:
return
dt = self._physics_engine.dt
while self._running:
self._physics_engine.step()
# Sleep for the physics timestep
await asyncio.sleep(dt)
async def start(self) -> None:
"""Start the simulation server.
Sets up all components and starts the TCP server and physics engine.
Raises:
RuntimeError: If server is already running.
"""
if self._running:
raise RuntimeError("Server is already running")
self._setup()
self._running = True
# Start TCP server
if self._instrument_server is not None:
await self._instrument_server.start()
# Start physics engine loop
self._physics_task = asyncio.create_task(self._run_physics())
logger.info("Simulation server started")
async def stop(self) -> None:
"""Stop the simulation server."""
if not self._running:
return
self._running = False
# Cancel physics loop
if self._physics_task is not None:
self._physics_task.cancel()
try:
await self._physics_task
except asyncio.CancelledError:
pass
self._physics_task = None
# Stop TCP server
if self._instrument_server is not None:
await self._instrument_server.stop()
self._instrument_server = None
self._physics_engine = None
logger.info("Simulation server stopped")
async def serve_forever(self) -> None:
"""Start the server and run until cancelled."""
await self.start()
try:
# Wait for the physics task (which runs until cancelled)
if self._physics_task is not None:
await self._physics_task
except asyncio.CancelledError:
pass
finally:
await self.stop()
async def run_server(config: ServerConfig | None = None) -> None:
"""Run the simulation server with signal handling.
This is the main entry point for running the server. It sets up
signal handlers for graceful shutdown.
Args:
config: Server configuration. Uses defaults if not provided.
"""
server = SimulationServer(config)
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
def signal_handler() -> None:
logger.info("Shutdown signal received")
stop_event.set()
# Register signal handlers (Unix-style, may not work on all Windows)
try:
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, signal_handler)
except NotImplementedError:
# Windows doesn't support add_signal_handler
pass
try:
await server.start()
logger.info("Simulation server running. Press Ctrl+C to stop.")
# Wait for stop signal
await stop_event.wait()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
finally:
await server.stop()
def main(
host: str = "127.0.0.1",
chamber_port: int = 5000,
psu_port: int = 5001,
dmm_port: int = 5002,
physics_rate: float = 100.0,
) -> None:
"""Run the simulation server from command line.
Args:
host: Host address to bind to.
chamber_port: Port for thermal chamber.
psu_port: Port for power supply.
dmm_port: Port for multimeter.
physics_rate: Physics engine update rate in Hz.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
config = ServerConfig(
host=host,
chamber_port=chamber_port,
psu_port=psu_port,
dmm_port=dmm_port,
physics_rate_hz=physics_rate,
)
asyncio.run(run_server(config))
if __name__ == "__main__":
main()