Implement TCP client handling
Add async client connection handling with: - Multiple concurrent connections per port - Line-based SCPI protocol (newline terminated) - start(), stop(), and serve_forever() methods - Proper connection lifecycle and error handling
This commit is contained in:
@@ -14,6 +14,9 @@ from typing import TYPE_CHECKING
|
|||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from py_dvt_ate.simulation.virtual.base import BaseInstrument
|
from py_dvt_ate.simulation.virtual.base import BaseInstrument
|
||||||
|
|
||||||
|
# Re-export for type checking - actual import happens at runtime via registration
|
||||||
|
__all__ = ["InstrumentServer"]
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -88,3 +91,122 @@ class InstrumentServer:
|
|||||||
def registered_ports(self) -> list[int]:
|
def registered_ports(self) -> list[int]:
|
||||||
"""Get list of registered port numbers."""
|
"""Get list of registered port numbers."""
|
||||||
return list(self._instruments.keys())
|
return list(self._instruments.keys())
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Start the server and begin listening on all registered ports.
|
||||||
|
|
||||||
|
Creates a TCP server for each registered instrument port.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: If server is already running or no instruments registered.
|
||||||
|
"""
|
||||||
|
if self._running:
|
||||||
|
raise RuntimeError("Server is already running")
|
||||||
|
|
||||||
|
if not self._instruments:
|
||||||
|
raise RuntimeError("No instruments registered")
|
||||||
|
|
||||||
|
self._running = True
|
||||||
|
|
||||||
|
for port, instrument in self._instruments.items():
|
||||||
|
server = await asyncio.start_server(
|
||||||
|
lambda r, w, inst=instrument, p=port: self._handle_client(r, w, inst, p),
|
||||||
|
self._host,
|
||||||
|
port,
|
||||||
|
)
|
||||||
|
self._servers.append(server)
|
||||||
|
logger.info(
|
||||||
|
"Started server for %s on %s:%d",
|
||||||
|
instrument.__class__.__name__,
|
||||||
|
self._host,
|
||||||
|
port,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Stop the server and close all connections."""
|
||||||
|
if not self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
for server in self._servers:
|
||||||
|
server.close()
|
||||||
|
await server.wait_closed()
|
||||||
|
|
||||||
|
self._servers.clear()
|
||||||
|
self._running = False
|
||||||
|
logger.info("Server stopped")
|
||||||
|
|
||||||
|
async def serve_forever(self) -> None:
|
||||||
|
"""Start the server and run until cancelled.
|
||||||
|
|
||||||
|
This is a convenience method that starts the server and blocks
|
||||||
|
until the server is stopped or cancelled.
|
||||||
|
"""
|
||||||
|
await self.start()
|
||||||
|
try:
|
||||||
|
# Keep running until cancelled
|
||||||
|
await asyncio.gather(
|
||||||
|
*[server.serve_forever() for server in self._servers]
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
await self.stop()
|
||||||
|
|
||||||
|
async def _handle_client(
|
||||||
|
self,
|
||||||
|
reader: asyncio.StreamReader,
|
||||||
|
writer: asyncio.StreamWriter,
|
||||||
|
instrument: BaseInstrument,
|
||||||
|
port: int,
|
||||||
|
) -> None:
|
||||||
|
"""Handle a client connection.
|
||||||
|
|
||||||
|
Reads SCPI commands (newline-terminated), processes them through
|
||||||
|
the instrument, and sends back responses (newline-terminated).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
reader: Stream reader for incoming data.
|
||||||
|
writer: Stream writer for outgoing data.
|
||||||
|
instrument: The instrument to process commands.
|
||||||
|
port: Port number for logging.
|
||||||
|
"""
|
||||||
|
addr = writer.get_extra_info("peername")
|
||||||
|
logger.info("Client connected to port %d from %s", port, addr)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# Read until newline (SCPI line terminator)
|
||||||
|
data = await reader.readline()
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
# Client disconnected
|
||||||
|
break
|
||||||
|
|
||||||
|
# Decode and strip whitespace
|
||||||
|
command = data.decode("utf-8").strip()
|
||||||
|
|
||||||
|
if not command:
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.debug("Port %d received: %s", port, command)
|
||||||
|
|
||||||
|
# Process command through instrument
|
||||||
|
response = instrument.process(command)
|
||||||
|
|
||||||
|
# Send response with newline terminator
|
||||||
|
if response:
|
||||||
|
writer.write(f"{response}\n".encode("utf-8"))
|
||||||
|
await writer.drain()
|
||||||
|
logger.debug("Port %d sent: %s", port, response)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.debug("Client handler cancelled for port %d", port)
|
||||||
|
except ConnectionResetError:
|
||||||
|
logger.debug("Client connection reset on port %d", port)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error handling client on port %d: %s", port, e)
|
||||||
|
finally:
|
||||||
|
writer.close()
|
||||||
|
try:
|
||||||
|
await writer.wait_closed()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
logger.info("Client disconnected from port %d", port)
|
||||||
|
|||||||
Reference in New Issue
Block a user