From d64d8bd4fb4b25f0c7004bfe6dd5e81b7b437898 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Mon, 30 Jun 2025 15:04:21 +0000 Subject: [PATCH] Add driver base class --- .../instruments/drivers/__init__.py | 4 + src/py_dvt_ate/instruments/drivers/base.py | 197 ++++++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 src/py_dvt_ate/instruments/drivers/base.py diff --git a/src/py_dvt_ate/instruments/drivers/__init__.py b/src/py_dvt_ate/instruments/drivers/__init__.py index 444758d..d0a5f3b 100644 --- a/src/py_dvt_ate/instruments/drivers/__init__.py +++ b/src/py_dvt_ate/instruments/drivers/__init__.py @@ -3,3 +3,7 @@ Each driver translates high-level operations into SCPI commands and handles responses from instruments. """ + +from py_dvt_ate.instruments.drivers.base import BaseDriver + +__all__ = ["BaseDriver"] diff --git a/src/py_dvt_ate/instruments/drivers/base.py b/src/py_dvt_ate/instruments/drivers/base.py new file mode 100644 index 0000000..19803e4 --- /dev/null +++ b/src/py_dvt_ate/instruments/drivers/base.py @@ -0,0 +1,197 @@ +"""Base class for SCPI instrument drivers. + +This module provides the foundation for implementing client-side instrument +drivers that communicate via SCPI commands over a transport layer. +""" + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from py_dvt_ate.instruments.transport.base import Transport + + +class BaseDriver: + """Base class for SCPI instrument drivers. + + Provides common functionality for communicating with instruments via + SCPI commands. Subclasses implement instrument-specific command methods. + + All drivers depend on a Transport instance for low-level communication. + + Attributes: + transport: The transport layer for communication. + """ + + def __init__(self, transport: "Transport") -> None: + """Initialise the driver with a transport layer. + + Args: + transport: Transport instance for communication (TCP, VISA, etc.). + """ + self.transport = transport + + def connect(self) -> None: + """Establish connection to the instrument. + + Raises: + ConnectionError: If connection fails. + """ + self.transport.connect() + + def disconnect(self) -> None: + """Close connection to the instrument. + + Safe to call multiple times (idempotent). + """ + self.transport.disconnect() + + @property + def is_connected(self) -> bool: + """Check if connection is active. + + Returns: + True if connected, False otherwise. + """ + return self.transport.is_connected + + def write(self, command: str) -> None: + """Send a SCPI command to the instrument. + + Args: + command: SCPI command string (without terminator). + + Raises: + ConnectionError: If not connected. + IOError: If write fails. + """ + self.transport.write(command) + + def query(self, command: str, timeout: float | None = None) -> str: + """Send a SCPI query and read the response. + + Args: + command: SCPI query string (without terminator). + timeout: Read timeout in seconds. None uses default. + + Returns: + Response string from instrument. + + Raises: + ConnectionError: If not connected. + TimeoutError: If read times out. + IOError: If communication fails. + """ + return self.transport.query(command, timeout) + + def query_float(self, command: str, timeout: float | None = None) -> float: + """Send a SCPI query and parse response as float. + + Args: + command: SCPI query string. + timeout: Read timeout in seconds. + + Returns: + Parsed floating-point value. + + Raises: + ConnectionError: If not connected. + TimeoutError: If read times out. + IOError: If communication fails. + ValueError: If response cannot be parsed as float. + """ + response = self.query(command, timeout) + try: + return float(response.strip()) + except ValueError as err: + raise ValueError(f"Cannot parse '{response}' as float") from err + + def query_int(self, command: str, timeout: float | None = None) -> int: + """Send a SCPI query and parse response as integer. + + Args: + command: SCPI query string. + timeout: Read timeout in seconds. + + Returns: + Parsed integer value. + + Raises: + ConnectionError: If not connected. + TimeoutError: If read times out. + IOError: If communication fails. + ValueError: If response cannot be parsed as integer. + """ + response = self.query(command, timeout) + try: + return int(response.strip()) + except ValueError as err: + raise ValueError(f"Cannot parse '{response}' as int") from err + + def query_bool(self, command: str, timeout: float | None = None) -> bool: + """Send a SCPI query and parse response as boolean. + + Interprets "1", "ON", "TRUE" as True; "0", "OFF", "FALSE" as False. + + Args: + command: SCPI query string. + timeout: Read timeout in seconds. + + Returns: + Parsed boolean value. + + Raises: + ConnectionError: If not connected. + TimeoutError: If read times out. + IOError: If communication fails. + ValueError: If response cannot be parsed as boolean. + """ + response = self.query(command, timeout).strip().upper() + if response in ("1", "ON", "TRUE"): + return True + if response in ("0", "OFF", "FALSE"): + return False + raise ValueError(f"Cannot parse '{response}' as bool") + + def identify(self) -> str: + """Query instrument identification (*IDN?). + + Returns: + Identification string in format: + "Manufacturer,Model,SerialNumber,FirmwareVersion" + + Raises: + ConnectionError: If not connected. + IOError: If communication fails. + """ + return self.query("*IDN?") + + def reset(self) -> None: + """Reset instrument to default state (*RST). + + Raises: + ConnectionError: If not connected. + IOError: If communication fails. + """ + self.write("*RST") + + def clear_status(self) -> None: + """Clear instrument status (*CLS). + + Raises: + ConnectionError: If not connected. + IOError: If communication fails. + """ + self.write("*CLS") + + def operation_complete(self) -> bool: + """Query operation complete status (*OPC?). + + Returns: + True if operation complete. + + Raises: + ConnectionError: If not connected. + IOError: If communication fails. + """ + response = self.query("*OPC?") + return response.strip() == "1"