"""Async TCP server for exposing instruments over network. This module provides the InstrumentServer class that hosts SCPI instruments over TCP, allowing client applications to communicate using standard SCPI commands over a network connection. This is a general-purpose server that works with any object implementing the SCPIDevice protocol (having a process(command) -> str method). """ from __future__ import annotations import asyncio import logging from functools import partial from typing import Protocol, runtime_checkable __all__ = ["InstrumentServer", "SCPIDevice"] logger = logging.getLogger(__name__) @runtime_checkable class SCPIDevice(Protocol): """Protocol for SCPI-compatible devices. Any object with a process method matching this signature can be served by InstrumentServer. """ def process(self, command: str) -> str: """Process a SCPI command and return the response. Args: command: SCPI command string to process. Returns: Response string (may be empty for commands with no response). """ ... class InstrumentServer: """Async TCP server hosting SCPI instruments. Each instrument is assigned a port. Clients connect via TCP and send SCPI commands as newline-terminated strings. Responses are also newline-terminated. This server can host any device implementing the SCPIDevice protocol, including both virtual instruments (simulators) and adapters for real hardware. Attributes: host: Host address to bind to. """ def __init__(self, host: str = "127.0.0.1") -> None: """Initialise the instrument server. Args: host: Host address to bind to. Defaults to localhost. """ self._host = host self._instruments: dict[int, SCPIDevice] = {} self._servers: list[asyncio.Server] = [] self._running = False @property def host(self) -> str: """Get the host address.""" return self._host @property def is_running(self) -> bool: """Check if server is currently running.""" return self._running def register_instrument(self, port: int, instrument: SCPIDevice) -> None: """Register an instrument to be served on a specific port. Args: port: TCP port number to serve the instrument on. instrument: SCPI device to serve (any object with process method). Raises: ValueError: If port is already registered. RuntimeError: If server is already running. """ if self._running: raise RuntimeError("Cannot register instruments while server is running") if port in self._instruments: raise ValueError(f"Port {port} is already registered") self._instruments[port] = instrument logger.info( "Registered %s on port %d", instrument.__class__.__name__, port, ) def get_instrument(self, port: int) -> SCPIDevice | None: """Get the instrument registered on a port. Args: port: Port number to look up. Returns: Registered instrument, or None if port not registered. """ return self._instruments.get(port) @property def registered_ports(self) -> list[int]: """Get list of registered port numbers.""" return list(self._instruments.keys()) async def start(self) -> None: """Start the server and begin listening on all registered ports. Creates a TCP server for each registered instrument port. Raises: RuntimeError: If server is already running or no instruments registered. """ if self._running: raise RuntimeError("Server is already running") if not self._instruments: raise RuntimeError("No instruments registered") self._running = True for port, instrument in self._instruments.items(): handler = partial(self._handle_client, instrument=instrument, port=port) server = await asyncio.start_server( handler, self._host, port, ) self._servers.append(server) logger.info( "Started server for %s on %s:%d", instrument.__class__.__name__, self._host, port, ) async def stop(self) -> None: """Stop the server and close all connections.""" if not self._running: return for server in self._servers: server.close() await server.wait_closed() self._servers.clear() self._running = False logger.info("Server stopped") async def serve_forever(self) -> None: """Start the server and run until cancelled. This is a convenience method that starts the server and blocks until the server is stopped or cancelled. """ await self.start() try: # Keep running until cancelled await asyncio.gather( *[server.serve_forever() for server in self._servers] ) finally: await self.stop() async def _handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, instrument: SCPIDevice, port: int, ) -> None: """Handle a client connection. Reads SCPI commands (newline-terminated), processes them through the instrument, and sends back responses (newline-terminated). Args: reader: Stream reader for incoming data. writer: Stream writer for outgoing data. instrument: The instrument to process commands. port: Port number for logging. """ addr = writer.get_extra_info("peername") logger.info("Client connected to port %d from %s", port, addr) try: while True: # Read until newline (SCPI line terminator) data = await reader.readline() if not data: # Client disconnected break # Decode and strip whitespace command = data.decode("utf-8").strip() if not command: continue logger.debug("Port %d received: %s", port, command) # Process command through instrument response = instrument.process(command) # Send response with newline terminator if response: writer.write(f"{response}\n".encode()) await writer.drain() logger.debug("Port %d sent: %s", port, response) except asyncio.CancelledError: logger.debug("Client handler cancelled for port %d", port) except ConnectionResetError: logger.debug("Client connection reset on port %d", port) except Exception as e: logger.error("Error handling client on port %d: %s", port, e) finally: writer.close() try: await writer.wait_closed() except Exception: pass logger.info("Client disconnected from port %d", port)