Files
py-dvt-ate/src/py_dvt_ate/instruments/transport/server.py
Kai Chappell e0a9976ca7
Some checks failed
CI / Lint (push) Successful in 4s
CI / Type Check (push) Successful in 19s
CI / Test (push) Failing after 20s
CI / Release (push) Has been skipped
Fix TCP server response handling and add pandas-stubs
- Always send a response (even empty) to prevent client timeouts
- Add pandas-stubs to dev dependencies for mypy type checking
- Server now sends newline-terminated response for all commands

This fixes the mypy CI failure. Integration test failures still need
investigation - likely due to event loop blocking when mixing sync/async.
2025-12-03 01:05:24 +00:00

241 lines
7.4 KiB
Python

"""Async TCP server for exposing instruments over network.
This module provides the InstrumentServer class that hosts SCPI
instruments over TCP, allowing client applications to communicate using
standard SCPI commands over a network connection.
This is a general-purpose server that works with any object implementing
the SCPIDevice protocol (having a process(command) -> str method).
"""
from __future__ import annotations
import asyncio
import logging
from functools import partial
from typing import Protocol, runtime_checkable
__all__ = ["InstrumentServer", "SCPIDevice"]
logger = logging.getLogger(__name__)
@runtime_checkable
class SCPIDevice(Protocol):
"""Protocol for SCPI-compatible devices.
Any object with a process method matching this signature can be
served by InstrumentServer.
"""
def process(self, command: str) -> str:
"""Process a SCPI command and return the response.
Args:
command: SCPI command string to process.
Returns:
Response string (may be empty for commands with no response).
"""
...
class InstrumentServer:
"""Async TCP server hosting SCPI instruments.
Each instrument is assigned a port. Clients connect via TCP and send
SCPI commands as newline-terminated strings. Responses are also
newline-terminated.
This server can host any device implementing the SCPIDevice protocol,
including both virtual instruments (simulators) and adapters for
real hardware.
Attributes:
host: Host address to bind to.
"""
def __init__(self, host: str = "127.0.0.1") -> None:
"""Initialise the instrument server.
Args:
host: Host address to bind to. Defaults to localhost.
"""
self._host = host
self._instruments: dict[int, SCPIDevice] = {}
self._servers: list[asyncio.Server] = []
self._running = False
@property
def host(self) -> str:
"""Get the host address."""
return self._host
@property
def is_running(self) -> bool:
"""Check if server is currently running."""
return self._running
def register_instrument(self, port: int, instrument: SCPIDevice) -> None:
"""Register an instrument to be served on a specific port.
Args:
port: TCP port number to serve the instrument on.
instrument: SCPI device to serve (any object with process method).
Raises:
ValueError: If port is already registered.
RuntimeError: If server is already running.
"""
if self._running:
raise RuntimeError("Cannot register instruments while server is running")
if port in self._instruments:
raise ValueError(f"Port {port} is already registered")
self._instruments[port] = instrument
logger.info(
"Registered %s on port %d",
instrument.__class__.__name__,
port,
)
def get_instrument(self, port: int) -> SCPIDevice | None:
"""Get the instrument registered on a port.
Args:
port: Port number to look up.
Returns:
Registered instrument, or None if port not registered.
"""
return self._instruments.get(port)
@property
def registered_ports(self) -> list[int]:
"""Get list of registered port numbers."""
return list(self._instruments.keys())
async def start(self) -> None:
"""Start the server and begin listening on all registered ports.
Creates a TCP server for each registered instrument port.
Raises:
RuntimeError: If server is already running or no instruments registered.
"""
if self._running:
raise RuntimeError("Server is already running")
if not self._instruments:
raise RuntimeError("No instruments registered")
self._running = True
for port, instrument in self._instruments.items():
handler = partial(self._handle_client, instrument=instrument, port=port)
server = await asyncio.start_server(
handler,
self._host,
port,
)
self._servers.append(server)
logger.info(
"Started server for %s on %s:%d",
instrument.__class__.__name__,
self._host,
port,
)
async def stop(self) -> None:
"""Stop the server and close all connections."""
if not self._running:
return
for server in self._servers:
server.close()
await server.wait_closed()
self._servers.clear()
self._running = False
logger.info("Server stopped")
async def serve_forever(self) -> None:
"""Start the server and run until cancelled.
This is a convenience method that starts the server and blocks
until the server is stopped or cancelled.
"""
await self.start()
try:
# Keep running until cancelled
await asyncio.gather(
*[server.serve_forever() for server in self._servers]
)
finally:
await self.stop()
async def _handle_client(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
instrument: SCPIDevice,
port: int,
) -> None:
"""Handle a client connection.
Reads SCPI commands (newline-terminated), processes them through
the instrument, and sends back responses (newline-terminated).
Args:
reader: Stream reader for incoming data.
writer: Stream writer for outgoing data.
instrument: The instrument to process commands.
port: Port number for logging.
"""
addr = writer.get_extra_info("peername")
logger.info("Client connected to port %d from %s", port, addr)
try:
while True:
# Read until newline (SCPI line terminator)
data = await reader.readline()
if not data:
# Client disconnected
break
# Decode and strip whitespace
command = data.decode("utf-8").strip()
if not command:
continue
logger.debug("Port %d received: %s", port, command)
# Process command through instrument
response = instrument.process(command)
# Send response with newline terminator
# Always send a response, even if empty (for acknowledgment)
writer.write(f"{response}\n".encode())
await writer.drain()
if response:
logger.debug("Port %d sent: %s", port, response)
else:
logger.debug("Port %d sent: <empty response>", port)
except asyncio.CancelledError:
logger.debug("Client handler cancelled for port %d", port)
except ConnectionResetError:
logger.debug("Client connection reset on port %d", port)
except Exception as e:
logger.error("Error handling client on port %d: %s", port, e)
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
logger.info("Client disconnected from port %d", port)