Add driver base class

This commit is contained in:
2025-06-30 15:04:21 +00:00
parent 1f00210b63
commit 8fe97047d1
2 changed files with 201 additions and 0 deletions

View File

@@ -0,0 +1,197 @@
"""Base class for SCPI instrument drivers.
This module provides the foundation for implementing client-side instrument
drivers that communicate via SCPI commands over a transport layer.
"""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from py_dvt_ate.instruments.transport.base import Transport
class BaseDriver:
"""Base class for SCPI instrument drivers.
Provides common functionality for communicating with instruments via
SCPI commands. Subclasses implement instrument-specific command methods.
All drivers depend on a Transport instance for low-level communication.
Attributes:
transport: The transport layer for communication.
"""
def __init__(self, transport: "Transport") -> None:
"""Initialise the driver with a transport layer.
Args:
transport: Transport instance for communication (TCP, VISA, etc.).
"""
self.transport = transport
def connect(self) -> None:
"""Establish connection to the instrument.
Raises:
ConnectionError: If connection fails.
"""
self.transport.connect()
def disconnect(self) -> None:
"""Close connection to the instrument.
Safe to call multiple times (idempotent).
"""
self.transport.disconnect()
@property
def is_connected(self) -> bool:
"""Check if connection is active.
Returns:
True if connected, False otherwise.
"""
return self.transport.is_connected
def write(self, command: str) -> None:
"""Send a SCPI command to the instrument.
Args:
command: SCPI command string (without terminator).
Raises:
ConnectionError: If not connected.
IOError: If write fails.
"""
self.transport.write(command)
def query(self, command: str, timeout: float | None = None) -> str:
"""Send a SCPI query and read the response.
Args:
command: SCPI query string (without terminator).
timeout: Read timeout in seconds. None uses default.
Returns:
Response string from instrument.
Raises:
ConnectionError: If not connected.
TimeoutError: If read times out.
IOError: If communication fails.
"""
return self.transport.query(command, timeout)
def query_float(self, command: str, timeout: float | None = None) -> float:
"""Send a SCPI query and parse response as float.
Args:
command: SCPI query string.
timeout: Read timeout in seconds.
Returns:
Parsed floating-point value.
Raises:
ConnectionError: If not connected.
TimeoutError: If read times out.
IOError: If communication fails.
ValueError: If response cannot be parsed as float.
"""
response = self.query(command, timeout)
try:
return float(response.strip())
except ValueError as err:
raise ValueError(f"Cannot parse '{response}' as float") from err
def query_int(self, command: str, timeout: float | None = None) -> int:
"""Send a SCPI query and parse response as integer.
Args:
command: SCPI query string.
timeout: Read timeout in seconds.
Returns:
Parsed integer value.
Raises:
ConnectionError: If not connected.
TimeoutError: If read times out.
IOError: If communication fails.
ValueError: If response cannot be parsed as integer.
"""
response = self.query(command, timeout)
try:
return int(response.strip())
except ValueError as err:
raise ValueError(f"Cannot parse '{response}' as int") from err
def query_bool(self, command: str, timeout: float | None = None) -> bool:
"""Send a SCPI query and parse response as boolean.
Interprets "1", "ON", "TRUE" as True; "0", "OFF", "FALSE" as False.
Args:
command: SCPI query string.
timeout: Read timeout in seconds.
Returns:
Parsed boolean value.
Raises:
ConnectionError: If not connected.
TimeoutError: If read times out.
IOError: If communication fails.
ValueError: If response cannot be parsed as boolean.
"""
response = self.query(command, timeout).strip().upper()
if response in ("1", "ON", "TRUE"):
return True
if response in ("0", "OFF", "FALSE"):
return False
raise ValueError(f"Cannot parse '{response}' as bool")
def identify(self) -> str:
"""Query instrument identification (*IDN?).
Returns:
Identification string in format:
"Manufacturer,Model,SerialNumber,FirmwareVersion"
Raises:
ConnectionError: If not connected.
IOError: If communication fails.
"""
return self.query("*IDN?")
def reset(self) -> None:
"""Reset instrument to default state (*RST).
Raises:
ConnectionError: If not connected.
IOError: If communication fails.
"""
self.write("*RST")
def clear_status(self) -> None:
"""Clear instrument status (*CLS).
Raises:
ConnectionError: If not connected.
IOError: If communication fails.
"""
self.write("*CLS")
def operation_complete(self) -> bool:
"""Query operation complete status (*OPC?).
Returns:
True if operation complete.
Raises:
ConnectionError: If not connected.
IOError: If communication fails.
"""
response = self.query("*OPC?")
return response.strip() == "1"