"""TCP socket transport for instrument communication.""" import socket from typing import Any from py_dvt_ate.instruments.transport.base import Transport class TCPTransport(Transport): """TCP socket transport implementation. Implements the Transport interface for communicating with SCPI instruments over TCP/IP using newline-terminated messages. Attributes: host: Hostname or IP address of the instrument. port: TCP port number. timeout: Default socket timeout in seconds. """ def __init__( self, host: str, port: int, timeout: float = 5.0, encoding: str = "utf-8", ) -> None: """Initialise TCP transport. Args: host: Hostname or IP address. port: TCP port number. timeout: Default socket timeout in seconds. encoding: Text encoding for commands and responses. """ self._host = host self._port = port self._timeout = timeout self._encoding = encoding self._socket: socket.socket | None = None @property def host(self) -> str: """Get the host address.""" return self._host @property def port(self) -> int: """Get the port number.""" return self._port @property def is_connected(self) -> bool: """Check if connection is active. Returns: True if connected, False otherwise. """ return self._socket is not None def connect(self) -> None: """Establish connection to instrument. Raises: ConnectionError: If connection fails or already connected. """ if self.is_connected: raise ConnectionError("Already connected") try: self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.settimeout(self._timeout) self._socket.connect((self._host, self._port)) except (socket.error, OSError) as err: self._socket = None raise ConnectionError( f"Failed to connect to {self._host}:{self._port}: {err}" ) from err def disconnect(self) -> None: """Close connection to instrument. Safe to call multiple times (idempotent). """ if self._socket is not None: try: self._socket.close() except OSError: pass # Ignore errors during close finally: self._socket = None def write(self, command: str) -> None: """Send command to instrument. Commands are sent with newline terminator appended. Args: command: SCPI command string to send (without terminator). Raises: ConnectionError: If not connected. IOError: If write fails. """ if not self.is_connected or self._socket is None: raise ConnectionError("Not connected") try: message = f"{command}\n".encode(self._encoding) self._socket.sendall(message) except (socket.error, OSError) as err: raise OSError(f"Write failed: {err}") from err def read(self, timeout: float | None = None) -> str: """Read response from instrument. Reads until newline terminator is received. Args: timeout: Read timeout in seconds. None uses default. Returns: Response string from instrument (without terminator). Raises: ConnectionError: If not connected. TimeoutError: If read times out. IOError: If read fails. """ if not self.is_connected or self._socket is None: raise ConnectionError("Not connected") # Set timeout if specified old_timeout = self._socket.gettimeout() if timeout is not None: self._socket.settimeout(timeout) try: # Read line by line (newline-terminated protocol) response_bytes = b"" while True: chunk = self._socket.recv(1) if not chunk: raise ConnectionError("Connection closed by remote host") response_bytes += chunk if chunk == b"\n": break # Decode and strip whitespace return response_bytes.decode(self._encoding).strip() except ConnectionError: raise # Re-raise ConnectionError as-is except socket.timeout as err: raise TimeoutError("Read timeout") from err except (socket.error, OSError, UnicodeDecodeError) as err: raise OSError(f"Read failed: {err}") from err finally: # Restore original timeout if timeout is not None: self._socket.settimeout(old_timeout) def query(self, command: str, timeout: float | None = None) -> str: """Send command and read response. Convenience method combining write() and read(). Args: command: SCPI command string to send. timeout: Read timeout in seconds. None uses default. Returns: Response string from instrument. Raises: ConnectionError: If not connected. TimeoutError: If read times out. IOError: If communication fails. """ self.write(command) return self.read(timeout) def __enter__(self) -> "TCPTransport": """Context manager entry.""" self.connect() return self def __exit__(self, *args: Any) -> None: """Context manager exit.""" self.disconnect() def __repr__(self) -> str: """String representation.""" status = "connected" if self.is_connected else "disconnected" return f"TCPTransport({self._host}:{self._port}, {status})"