Move InstrumentServer to instruments/transport
InstrumentServer is a general-purpose SCPI-over-TCP server that can host any device implementing the SCPIDevice protocol (process method). Moving it from simulation/ to instruments/transport/ reflects this: - simulation package now depends on instruments package - InstrumentServer can host both virtual and real instrument adapters - Added SCPIDevice Protocol for type-safe device registration
This commit is contained in:
235
src/py_dvt_ate/instruments/transport/server.py
Normal file
235
src/py_dvt_ate/instruments/transport/server.py
Normal file
@@ -0,0 +1,235 @@
|
||||
"""Async TCP server for exposing instruments over network.
|
||||
|
||||
This module provides the InstrumentServer class that hosts SCPI
|
||||
instruments over TCP, allowing client applications to communicate using
|
||||
standard SCPI commands over a network connection.
|
||||
|
||||
This is a general-purpose server that works with any object implementing
|
||||
the SCPIDevice protocol (having a process(command) -> str method).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
__all__ = ["InstrumentServer", "SCPIDevice"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class SCPIDevice(Protocol):
|
||||
"""Protocol for SCPI-compatible devices.
|
||||
|
||||
Any object with a process method matching this signature can be
|
||||
served by InstrumentServer.
|
||||
"""
|
||||
|
||||
def process(self, command: str) -> str:
|
||||
"""Process a SCPI command and return the response.
|
||||
|
||||
Args:
|
||||
command: SCPI command string to process.
|
||||
|
||||
Returns:
|
||||
Response string (may be empty for commands with no response).
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class InstrumentServer:
|
||||
"""Async TCP server hosting SCPI instruments.
|
||||
|
||||
Each instrument is assigned a port. Clients connect via TCP and send
|
||||
SCPI commands as newline-terminated strings. Responses are also
|
||||
newline-terminated.
|
||||
|
||||
This server can host any device implementing the SCPIDevice protocol,
|
||||
including both virtual instruments (simulators) and adapters for
|
||||
real hardware.
|
||||
|
||||
Attributes:
|
||||
host: Host address to bind to.
|
||||
"""
|
||||
|
||||
def __init__(self, host: str = "127.0.0.1") -> None:
|
||||
"""Initialise the instrument server.
|
||||
|
||||
Args:
|
||||
host: Host address to bind to. Defaults to localhost.
|
||||
"""
|
||||
self._host = host
|
||||
self._instruments: dict[int, SCPIDevice] = {}
|
||||
self._servers: list[asyncio.Server] = []
|
||||
self._running = False
|
||||
|
||||
@property
|
||||
def host(self) -> str:
|
||||
"""Get the host address."""
|
||||
return self._host
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""Check if server is currently running."""
|
||||
return self._running
|
||||
|
||||
def register_instrument(self, port: int, instrument: SCPIDevice) -> None:
|
||||
"""Register an instrument to be served on a specific port.
|
||||
|
||||
Args:
|
||||
port: TCP port number to serve the instrument on.
|
||||
instrument: SCPI device to serve (any object with process method).
|
||||
|
||||
Raises:
|
||||
ValueError: If port is already registered.
|
||||
RuntimeError: If server is already running.
|
||||
"""
|
||||
if self._running:
|
||||
raise RuntimeError("Cannot register instruments while server is running")
|
||||
|
||||
if port in self._instruments:
|
||||
raise ValueError(f"Port {port} is already registered")
|
||||
|
||||
self._instruments[port] = instrument
|
||||
logger.info(
|
||||
"Registered %s on port %d",
|
||||
instrument.__class__.__name__,
|
||||
port,
|
||||
)
|
||||
|
||||
def get_instrument(self, port: int) -> SCPIDevice | None:
|
||||
"""Get the instrument registered on a port.
|
||||
|
||||
Args:
|
||||
port: Port number to look up.
|
||||
|
||||
Returns:
|
||||
Registered instrument, or None if port not registered.
|
||||
"""
|
||||
return self._instruments.get(port)
|
||||
|
||||
@property
|
||||
def registered_ports(self) -> list[int]:
|
||||
"""Get list of registered port numbers."""
|
||||
return list(self._instruments.keys())
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the server and begin listening on all registered ports.
|
||||
|
||||
Creates a TCP server for each registered instrument port.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If server is already running or no instruments registered.
|
||||
"""
|
||||
if self._running:
|
||||
raise RuntimeError("Server is already running")
|
||||
|
||||
if not self._instruments:
|
||||
raise RuntimeError("No instruments registered")
|
||||
|
||||
self._running = True
|
||||
|
||||
for port, instrument in self._instruments.items():
|
||||
server = await asyncio.start_server(
|
||||
lambda r, w, inst=instrument, p=port: self._handle_client(r, w, inst, p),
|
||||
self._host,
|
||||
port,
|
||||
)
|
||||
self._servers.append(server)
|
||||
logger.info(
|
||||
"Started server for %s on %s:%d",
|
||||
instrument.__class__.__name__,
|
||||
self._host,
|
||||
port,
|
||||
)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the server and close all connections."""
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
for server in self._servers:
|
||||
server.close()
|
||||
await server.wait_closed()
|
||||
|
||||
self._servers.clear()
|
||||
self._running = False
|
||||
logger.info("Server stopped")
|
||||
|
||||
async def serve_forever(self) -> None:
|
||||
"""Start the server and run until cancelled.
|
||||
|
||||
This is a convenience method that starts the server and blocks
|
||||
until the server is stopped or cancelled.
|
||||
"""
|
||||
await self.start()
|
||||
try:
|
||||
# Keep running until cancelled
|
||||
await asyncio.gather(
|
||||
*[server.serve_forever() for server in self._servers]
|
||||
)
|
||||
finally:
|
||||
await self.stop()
|
||||
|
||||
async def _handle_client(
|
||||
self,
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
instrument: SCPIDevice,
|
||||
port: int,
|
||||
) -> None:
|
||||
"""Handle a client connection.
|
||||
|
||||
Reads SCPI commands (newline-terminated), processes them through
|
||||
the instrument, and sends back responses (newline-terminated).
|
||||
|
||||
Args:
|
||||
reader: Stream reader for incoming data.
|
||||
writer: Stream writer for outgoing data.
|
||||
instrument: The instrument to process commands.
|
||||
port: Port number for logging.
|
||||
"""
|
||||
addr = writer.get_extra_info("peername")
|
||||
logger.info("Client connected to port %d from %s", port, addr)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Read until newline (SCPI line terminator)
|
||||
data = await reader.readline()
|
||||
|
||||
if not data:
|
||||
# Client disconnected
|
||||
break
|
||||
|
||||
# Decode and strip whitespace
|
||||
command = data.decode("utf-8").strip()
|
||||
|
||||
if not command:
|
||||
continue
|
||||
|
||||
logger.debug("Port %d received: %s", port, command)
|
||||
|
||||
# Process command through instrument
|
||||
response = instrument.process(command)
|
||||
|
||||
# Send response with newline terminator
|
||||
if response:
|
||||
writer.write(f"{response}\n".encode("utf-8"))
|
||||
await writer.drain()
|
||||
logger.debug("Port %d sent: %s", port, response)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Client handler cancelled for port %d", port)
|
||||
except ConnectionResetError:
|
||||
logger.debug("Client connection reset on port %d", port)
|
||||
except Exception as e:
|
||||
logger.error("Error handling client on port %d: %s", port, e)
|
||||
finally:
|
||||
writer.close()
|
||||
try:
|
||||
await writer.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info("Client disconnected from port %d", port)
|
||||
Reference in New Issue
Block a user