"""Async TCP server for exposing virtual instruments over network. This module provides the InstrumentServer class that hosts virtual SCPI instruments over TCP, allowing client applications to communicate using standard SCPI commands over a network connection. """ from __future__ import annotations import asyncio import logging from typing import TYPE_CHECKING if TYPE_CHECKING: from py_dvt_ate.simulation.virtual.base import BaseInstrument # Re-export for type checking - actual import happens at runtime via registration __all__ = ["InstrumentServer"] logger = logging.getLogger(__name__) class InstrumentServer: """Async TCP server hosting virtual SCPI instruments. Each instrument is assigned a port. Clients connect via TCP and send SCPI commands as newline-terminated strings. Responses are also newline-terminated. Attributes: host: Host address to bind to. """ def __init__(self, host: str = "127.0.0.1") -> None: """Initialise the instrument server. Args: host: Host address to bind to. Defaults to localhost. """ self._host = host self._instruments: dict[int, BaseInstrument] = {} self._servers: list[asyncio.Server] = [] self._running = False @property def host(self) -> str: """Get the host address.""" return self._host @property def is_running(self) -> bool: """Check if server is currently running.""" return self._running def register_instrument(self, port: int, instrument: BaseInstrument) -> None: """Register an instrument to be served on a specific port. Args: port: TCP port number to serve the instrument on. instrument: Virtual instrument to serve. Raises: ValueError: If port is already registered. RuntimeError: If server is already running. """ if self._running: raise RuntimeError("Cannot register instruments while server is running") if port in self._instruments: raise ValueError(f"Port {port} is already registered") self._instruments[port] = instrument logger.info( "Registered %s on port %d", instrument.__class__.__name__, port, ) def get_instrument(self, port: int) -> BaseInstrument | None: """Get the instrument registered on a port. Args: port: Port number to look up. Returns: Registered instrument, or None if port not registered. """ return self._instruments.get(port) @property def registered_ports(self) -> list[int]: """Get list of registered port numbers.""" return list(self._instruments.keys()) async def start(self) -> None: """Start the server and begin listening on all registered ports. Creates a TCP server for each registered instrument port. Raises: RuntimeError: If server is already running or no instruments registered. """ if self._running: raise RuntimeError("Server is already running") if not self._instruments: raise RuntimeError("No instruments registered") self._running = True for port, instrument in self._instruments.items(): server = await asyncio.start_server( lambda r, w, inst=instrument, p=port: self._handle_client(r, w, inst, p), self._host, port, ) self._servers.append(server) logger.info( "Started server for %s on %s:%d", instrument.__class__.__name__, self._host, port, ) async def stop(self) -> None: """Stop the server and close all connections.""" if not self._running: return for server in self._servers: server.close() await server.wait_closed() self._servers.clear() self._running = False logger.info("Server stopped") async def serve_forever(self) -> None: """Start the server and run until cancelled. This is a convenience method that starts the server and blocks until the server is stopped or cancelled. """ await self.start() try: # Keep running until cancelled await asyncio.gather( *[server.serve_forever() for server in self._servers] ) finally: await self.stop() async def _handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, instrument: BaseInstrument, port: int, ) -> None: """Handle a client connection. Reads SCPI commands (newline-terminated), processes them through the instrument, and sends back responses (newline-terminated). Args: reader: Stream reader for incoming data. writer: Stream writer for outgoing data. instrument: The instrument to process commands. port: Port number for logging. """ addr = writer.get_extra_info("peername") logger.info("Client connected to port %d from %s", port, addr) try: while True: # Read until newline (SCPI line terminator) data = await reader.readline() if not data: # Client disconnected break # Decode and strip whitespace command = data.decode("utf-8").strip() if not command: continue logger.debug("Port %d received: %s", port, command) # Process command through instrument response = instrument.process(command) # Send response with newline terminator if response: writer.write(f"{response}\n".encode("utf-8")) await writer.drain() logger.debug("Port %d sent: %s", port, response) except asyncio.CancelledError: logger.debug("Client handler cancelled for port %d", port) except ConnectionResetError: logger.debug("Client connection reset on port %d", port) except Exception as e: logger.error("Error handling client on port %d: %s", port, e) finally: writer.close() try: await writer.wait_closed() except Exception: pass logger.info("Client disconnected from port %d", port)