From f46b443bd151a8b0a4c084eab9e72521ab6e6e80 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Sat, 24 May 2025 13:48:10 +0000 Subject: [PATCH] Implement TCP client handling Add async client connection handling with: - Multiple concurrent connections per port - Line-based SCPI protocol (newline terminated) - start(), stop(), and serve_forever() methods - Proper connection lifecycle and error handling --- src/py_dvt_ate/simulation/tcp_server.py | 122 ++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/src/py_dvt_ate/simulation/tcp_server.py b/src/py_dvt_ate/simulation/tcp_server.py index cf6d892..5cc0045 100644 --- a/src/py_dvt_ate/simulation/tcp_server.py +++ b/src/py_dvt_ate/simulation/tcp_server.py @@ -14,6 +14,9 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from py_dvt_ate.simulation.virtual.base import BaseInstrument +# Re-export for type checking - actual import happens at runtime via registration +__all__ = ["InstrumentServer"] + logger = logging.getLogger(__name__) @@ -88,3 +91,122 @@ class InstrumentServer: def registered_ports(self) -> list[int]: """Get list of registered port numbers.""" return list(self._instruments.keys()) + + async def start(self) -> None: + """Start the server and begin listening on all registered ports. + + Creates a TCP server for each registered instrument port. + + Raises: + RuntimeError: If server is already running or no instruments registered. + """ + if self._running: + raise RuntimeError("Server is already running") + + if not self._instruments: + raise RuntimeError("No instruments registered") + + self._running = True + + for port, instrument in self._instruments.items(): + server = await asyncio.start_server( + lambda r, w, inst=instrument, p=port: self._handle_client(r, w, inst, p), + self._host, + port, + ) + self._servers.append(server) + logger.info( + "Started server for %s on %s:%d", + instrument.__class__.__name__, + self._host, + port, + ) + + async def stop(self) -> None: + """Stop the server and close all connections.""" + if not self._running: + return + + for server in self._servers: + server.close() + await server.wait_closed() + + self._servers.clear() + self._running = False + logger.info("Server stopped") + + async def serve_forever(self) -> None: + """Start the server and run until cancelled. + + This is a convenience method that starts the server and blocks + until the server is stopped or cancelled. + """ + await self.start() + try: + # Keep running until cancelled + await asyncio.gather( + *[server.serve_forever() for server in self._servers] + ) + finally: + await self.stop() + + async def _handle_client( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + instrument: BaseInstrument, + port: int, + ) -> None: + """Handle a client connection. + + Reads SCPI commands (newline-terminated), processes them through + the instrument, and sends back responses (newline-terminated). + + Args: + reader: Stream reader for incoming data. + writer: Stream writer for outgoing data. + instrument: The instrument to process commands. + port: Port number for logging. + """ + addr = writer.get_extra_info("peername") + logger.info("Client connected to port %d from %s", port, addr) + + try: + while True: + # Read until newline (SCPI line terminator) + data = await reader.readline() + + if not data: + # Client disconnected + break + + # Decode and strip whitespace + command = data.decode("utf-8").strip() + + if not command: + continue + + logger.debug("Port %d received: %s", port, command) + + # Process command through instrument + response = instrument.process(command) + + # Send response with newline terminator + if response: + writer.write(f"{response}\n".encode("utf-8")) + await writer.drain() + logger.debug("Port %d sent: %s", port, response) + + except asyncio.CancelledError: + logger.debug("Client handler cancelled for port %d", port) + except ConnectionResetError: + logger.debug("Client connection reset on port %d", port) + except Exception as e: + logger.error("Error handling client on port %d: %s", port, e) + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + logger.info("Client disconnected from port %d", port)