Implement TCP transport

This commit is contained in:
2025-12-02 20:58:40 +00:00
parent 0722ce92cd
commit 034d80388f
2 changed files with 193 additions and 1 deletions

View File

@@ -8,5 +8,6 @@ Provides connection abstractions for different backends:
from py_dvt_ate.instruments.transport.base import Transport
from py_dvt_ate.instruments.transport.server import InstrumentServer, SCPIDevice
from py_dvt_ate.instruments.transport.tcp import TCPTransport
__all__ = ["Transport", "InstrumentServer", "SCPIDevice"]
__all__ = ["Transport", "TCPTransport", "InstrumentServer", "SCPIDevice"]

View File

@@ -0,0 +1,191 @@
"""TCP socket transport for instrument communication."""
import socket
from typing import Any
class TCPTransport:
"""TCP socket transport implementation.
Implements the Transport protocol for communicating with SCPI
instruments over TCP/IP using newline-terminated messages.
Attributes:
host: Hostname or IP address of the instrument.
port: TCP port number.
timeout: Default socket timeout in seconds.
"""
def __init__(
self,
host: str,
port: int,
timeout: float = 5.0,
encoding: str = "utf-8",
) -> None:
"""Initialise TCP transport.
Args:
host: Hostname or IP address.
port: TCP port number.
timeout: Default socket timeout in seconds.
encoding: Text encoding for commands and responses.
"""
self._host = host
self._port = port
self._timeout = timeout
self._encoding = encoding
self._socket: socket.socket | None = None
@property
def host(self) -> str:
"""Get the host address."""
return self._host
@property
def port(self) -> int:
"""Get the port number."""
return self._port
@property
def is_connected(self) -> bool:
"""Check if connection is active.
Returns:
True if connected, False otherwise.
"""
return self._socket is not None
def connect(self) -> None:
"""Establish connection to instrument.
Raises:
ConnectionError: If connection fails or already connected.
"""
if self.is_connected:
raise ConnectionError("Already connected")
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self._timeout)
self._socket.connect((self._host, self._port))
except (socket.error, OSError) as err:
self._socket = None
raise ConnectionError(
f"Failed to connect to {self._host}:{self._port}: {err}"
) from err
def disconnect(self) -> None:
"""Close connection to instrument.
Safe to call multiple times (idempotent).
"""
if self._socket is not None:
try:
self._socket.close()
except OSError:
pass # Ignore errors during close
finally:
self._socket = None
def write(self, command: str) -> None:
"""Send command to instrument.
Commands are sent with newline terminator appended.
Args:
command: SCPI command string to send (without terminator).
Raises:
ConnectionError: If not connected.
IOError: If write fails.
"""
if not self.is_connected or self._socket is None:
raise ConnectionError("Not connected")
try:
message = f"{command}\n".encode(self._encoding)
self._socket.sendall(message)
except (socket.error, OSError) as err:
raise OSError(f"Write failed: {err}") from err
def read(self, timeout: float | None = None) -> str:
"""Read response from instrument.
Reads until newline terminator is received.
Args:
timeout: Read timeout in seconds. None uses default.
Returns:
Response string from instrument (without terminator).
Raises:
ConnectionError: If not connected.
TimeoutError: If read times out.
IOError: If read fails.
"""
if not self.is_connected or self._socket is None:
raise ConnectionError("Not connected")
# Set timeout if specified
old_timeout = self._socket.gettimeout()
if timeout is not None:
self._socket.settimeout(timeout)
try:
# Read line by line (newline-terminated protocol)
response_bytes = b""
while True:
chunk = self._socket.recv(1)
if not chunk:
raise ConnectionError("Connection closed by remote host")
response_bytes += chunk
if chunk == b"\n":
break
# Decode and strip whitespace
return response_bytes.decode(self._encoding).strip()
except socket.timeout as err:
raise TimeoutError("Read timeout") from err
except (socket.error, OSError, UnicodeDecodeError) as err:
raise OSError(f"Read failed: {err}") from err
finally:
# Restore original timeout
if timeout is not None:
self._socket.settimeout(old_timeout)
def query(self, command: str, timeout: float | None = None) -> str:
"""Send command and read response.
Convenience method combining write() and read().
Args:
command: SCPI command string to send.
timeout: Read timeout in seconds. None uses default.
Returns:
Response string from instrument.
Raises:
ConnectionError: If not connected.
TimeoutError: If read times out.
IOError: If communication fails.
"""
self.write(command)
return self.read(timeout)
def __enter__(self) -> "TCPTransport":
"""Context manager entry."""
self.connect()
return self
def __exit__(self, *args: Any) -> None:
"""Context manager exit."""
self.disconnect()
def __repr__(self) -> str:
"""String representation."""
status = "connected" if self.is_connected else "disconnected"
return f"TCPTransport({self._host}:{self._port}, {status})"