Add driver base class
This commit is contained in:
@@ -3,3 +3,7 @@
|
|||||||
Each driver translates high-level operations into SCPI commands
|
Each driver translates high-level operations into SCPI commands
|
||||||
and handles responses from instruments.
|
and handles responses from instruments.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.drivers.base import BaseDriver
|
||||||
|
|
||||||
|
__all__ = ["BaseDriver"]
|
||||||
|
|||||||
197
src/py_dvt_ate/instruments/drivers/base.py
Normal file
197
src/py_dvt_ate/instruments/drivers/base.py
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
"""Base class for SCPI instrument drivers.
|
||||||
|
|
||||||
|
This module provides the foundation for implementing client-side instrument
|
||||||
|
drivers that communicate via SCPI commands over a transport layer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from py_dvt_ate.instruments.transport.base import Transport
|
||||||
|
|
||||||
|
|
||||||
|
class BaseDriver:
|
||||||
|
"""Base class for SCPI instrument drivers.
|
||||||
|
|
||||||
|
Provides common functionality for communicating with instruments via
|
||||||
|
SCPI commands. Subclasses implement instrument-specific command methods.
|
||||||
|
|
||||||
|
All drivers depend on a Transport instance for low-level communication.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
transport: The transport layer for communication.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, transport: "Transport") -> None:
|
||||||
|
"""Initialise the driver with a transport layer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
transport: Transport instance for communication (TCP, VISA, etc.).
|
||||||
|
"""
|
||||||
|
self.transport = transport
|
||||||
|
|
||||||
|
def connect(self) -> None:
|
||||||
|
"""Establish connection to the instrument.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If connection fails.
|
||||||
|
"""
|
||||||
|
self.transport.connect()
|
||||||
|
|
||||||
|
def disconnect(self) -> None:
|
||||||
|
"""Close connection to the instrument.
|
||||||
|
|
||||||
|
Safe to call multiple times (idempotent).
|
||||||
|
"""
|
||||||
|
self.transport.disconnect()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_connected(self) -> bool:
|
||||||
|
"""Check if connection is active.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if connected, False otherwise.
|
||||||
|
"""
|
||||||
|
return self.transport.is_connected
|
||||||
|
|
||||||
|
def write(self, command: str) -> None:
|
||||||
|
"""Send a SCPI command to the instrument.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI command string (without terminator).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If write fails.
|
||||||
|
"""
|
||||||
|
self.transport.write(command)
|
||||||
|
|
||||||
|
def query(self, command: str, timeout: float | None = None) -> str:
|
||||||
|
"""Send a SCPI query and read the response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI query string (without terminator).
|
||||||
|
timeout: Read timeout in seconds. None uses default.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response string from instrument.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
return self.transport.query(command, timeout)
|
||||||
|
|
||||||
|
def query_float(self, command: str, timeout: float | None = None) -> float:
|
||||||
|
"""Send a SCPI query and parse response as float.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI query string.
|
||||||
|
timeout: Read timeout in seconds.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parsed floating-point value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
ValueError: If response cannot be parsed as float.
|
||||||
|
"""
|
||||||
|
response = self.query(command, timeout)
|
||||||
|
try:
|
||||||
|
return float(response.strip())
|
||||||
|
except ValueError as err:
|
||||||
|
raise ValueError(f"Cannot parse '{response}' as float") from err
|
||||||
|
|
||||||
|
def query_int(self, command: str, timeout: float | None = None) -> int:
|
||||||
|
"""Send a SCPI query and parse response as integer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI query string.
|
||||||
|
timeout: Read timeout in seconds.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parsed integer value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
ValueError: If response cannot be parsed as integer.
|
||||||
|
"""
|
||||||
|
response = self.query(command, timeout)
|
||||||
|
try:
|
||||||
|
return int(response.strip())
|
||||||
|
except ValueError as err:
|
||||||
|
raise ValueError(f"Cannot parse '{response}' as int") from err
|
||||||
|
|
||||||
|
def query_bool(self, command: str, timeout: float | None = None) -> bool:
|
||||||
|
"""Send a SCPI query and parse response as boolean.
|
||||||
|
|
||||||
|
Interprets "1", "ON", "TRUE" as True; "0", "OFF", "FALSE" as False.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI query string.
|
||||||
|
timeout: Read timeout in seconds.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parsed boolean value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
ValueError: If response cannot be parsed as boolean.
|
||||||
|
"""
|
||||||
|
response = self.query(command, timeout).strip().upper()
|
||||||
|
if response in ("1", "ON", "TRUE"):
|
||||||
|
return True
|
||||||
|
if response in ("0", "OFF", "FALSE"):
|
||||||
|
return False
|
||||||
|
raise ValueError(f"Cannot parse '{response}' as bool")
|
||||||
|
|
||||||
|
def identify(self) -> str:
|
||||||
|
"""Query instrument identification (*IDN?).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Identification string in format:
|
||||||
|
"Manufacturer,Model,SerialNumber,FirmwareVersion"
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
return self.query("*IDN?")
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Reset instrument to default state (*RST).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
self.write("*RST")
|
||||||
|
|
||||||
|
def clear_status(self) -> None:
|
||||||
|
"""Clear instrument status (*CLS).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
self.write("*CLS")
|
||||||
|
|
||||||
|
def operation_complete(self) -> bool:
|
||||||
|
"""Query operation complete status (*OPC?).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if operation complete.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
response = self.query("*OPC?")
|
||||||
|
return response.strip() == "1"
|
||||||
Reference in New Issue
Block a user