Add base instrument class

Provides SCPI command parsing and dispatch mechanism for virtual
instruments. Includes IEEE 488.2 common commands (*IDN?, *RST, *CLS,
*OPC) and abstract methods for instrument-specific implementations.
This commit is contained in:
2025-04-28 19:24:24 +00:00
parent 5d15dfd7d0
commit c0155baa4f

View File

@@ -0,0 +1,156 @@
"""Base class for virtual instrument simulators.
This module provides the foundation for implementing SCPI-based virtual
instruments that can be exposed over TCP for hardware abstraction testing.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Callable
from typing import TYPE_CHECKING
from py_dvt_ate.instruments.scpi import SCPICommand, SCPIParser
if TYPE_CHECKING:
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
# Type alias for command handlers
CommandHandler = Callable[[SCPICommand], str]
class BaseInstrument(ABC):
"""Abstract base class for virtual SCPI instruments.
Provides common functionality for SCPI command parsing and dispatch.
Subclasses should register command handlers using the register_command
method or by overriding _setup_commands.
Attributes:
manufacturer: Instrument manufacturer name for *IDN? response.
model: Instrument model name for *IDN? response.
serial_number: Instrument serial number for *IDN? response.
firmware_version: Firmware version for *IDN? response.
"""
manufacturer: str = "PyDVTATE"
model: str = "Virtual Instrument"
serial_number: str = "SIM001"
firmware_version: str = "1.0.0"
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
"""Initialise the base instrument.
Args:
physics_engine: Reference to physics engine for simulation state.
May be None for standalone operation.
"""
self._physics_engine = physics_engine
self._parser = SCPIParser()
self._handlers: dict[str, CommandHandler] = {}
self._setup_common_commands()
self._setup_commands()
def _setup_common_commands(self) -> None:
"""Register IEEE 488.2 common commands."""
self.register_command("*IDN", self._handle_idn)
self.register_command("*RST", self._handle_rst)
self.register_command("*CLS", self._handle_cls)
self.register_command("*OPC", self._handle_opc)
@abstractmethod
def _setup_commands(self) -> None:
"""Register instrument-specific command handlers.
Subclasses must implement this method to register their
SCPI command handlers using register_command().
"""
def register_command(self, keyword: str, handler: CommandHandler) -> None:
"""Register a handler for a SCPI command keyword.
Args:
keyword: The command keyword (e.g., "TEMP:SETPOINT").
For commands that support both set and query forms,
register the base keyword without '?'.
handler: Callable that takes SCPICommand and returns response string.
"""
self._handlers[keyword.upper()] = handler
def process(self, command_string: str) -> str:
"""Process a SCPI command string and return the response.
Args:
command_string: Raw SCPI command string to process.
Returns:
Response string. Empty string for commands with no response.
Error string starting with "ERROR:" for invalid commands.
"""
command = self._parser.parse(command_string)
if not command.header:
return ""
# Look up handler by keyword (without '?' suffix)
keyword = command.keyword.upper()
handler = self._handlers.get(keyword)
if handler is None:
return f"ERROR: Unknown command '{keyword}'"
try:
return handler(command)
except ValueError as e:
return f"ERROR: {e}"
except Exception as e:
return f"ERROR: Internal error - {e}"
def _handle_idn(self, command: SCPICommand) -> str:
"""Handle *IDN? identification query.
Returns:
Comma-separated identification string.
"""
if not command.is_query:
return "ERROR: *IDN is query only"
return f"{self.manufacturer},{self.model},{self.serial_number},{self.firmware_version}"
def _handle_rst(self, command: SCPICommand) -> str:
"""Handle *RST reset command.
Returns:
Empty string (no response for reset).
"""
if command.is_query:
return "ERROR: *RST is command only"
self.reset()
return ""
def _handle_cls(self, command: SCPICommand) -> str:
"""Handle *CLS clear status command.
Returns:
Empty string (no response for clear).
"""
if command.is_query:
return "ERROR: *CLS is command only"
return ""
def _handle_opc(self, command: SCPICommand) -> str:
"""Handle *OPC operation complete command/query.
Returns:
"1" for query, empty string for command.
"""
if command.is_query:
return "1"
return ""
@abstractmethod
def reset(self) -> None:
"""Reset instrument to default state.
Subclasses must implement this to define reset behaviour.
"""