Add base instrument class
Provides SCPI command parsing and dispatch mechanism for virtual instruments. Includes IEEE 488.2 common commands (*IDN?, *RST, *CLS, *OPC) and abstract methods for instrument-specific implementations.
This commit is contained in:
156
src/py_dvt_ate/simulation/virtual/base.py
Normal file
156
src/py_dvt_ate/simulation/virtual/base.py
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
"""Base class for virtual instrument simulators.
|
||||||
|
|
||||||
|
This module provides the foundation for implementing SCPI-based virtual
|
||||||
|
instruments that can be exposed over TCP for hardware abstraction testing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from collections.abc import Callable
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.scpi import SCPICommand, SCPIParser
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||||
|
|
||||||
|
|
||||||
|
# Type alias for command handlers
|
||||||
|
CommandHandler = Callable[[SCPICommand], str]
|
||||||
|
|
||||||
|
|
||||||
|
class BaseInstrument(ABC):
|
||||||
|
"""Abstract base class for virtual SCPI instruments.
|
||||||
|
|
||||||
|
Provides common functionality for SCPI command parsing and dispatch.
|
||||||
|
Subclasses should register command handlers using the register_command
|
||||||
|
method or by overriding _setup_commands.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
manufacturer: Instrument manufacturer name for *IDN? response.
|
||||||
|
model: Instrument model name for *IDN? response.
|
||||||
|
serial_number: Instrument serial number for *IDN? response.
|
||||||
|
firmware_version: Firmware version for *IDN? response.
|
||||||
|
"""
|
||||||
|
|
||||||
|
manufacturer: str = "PyDVTATE"
|
||||||
|
model: str = "Virtual Instrument"
|
||||||
|
serial_number: str = "SIM001"
|
||||||
|
firmware_version: str = "1.0.0"
|
||||||
|
|
||||||
|
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
|
||||||
|
"""Initialise the base instrument.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
physics_engine: Reference to physics engine for simulation state.
|
||||||
|
May be None for standalone operation.
|
||||||
|
"""
|
||||||
|
self._physics_engine = physics_engine
|
||||||
|
self._parser = SCPIParser()
|
||||||
|
self._handlers: dict[str, CommandHandler] = {}
|
||||||
|
self._setup_common_commands()
|
||||||
|
self._setup_commands()
|
||||||
|
|
||||||
|
def _setup_common_commands(self) -> None:
|
||||||
|
"""Register IEEE 488.2 common commands."""
|
||||||
|
self.register_command("*IDN", self._handle_idn)
|
||||||
|
self.register_command("*RST", self._handle_rst)
|
||||||
|
self.register_command("*CLS", self._handle_cls)
|
||||||
|
self.register_command("*OPC", self._handle_opc)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _setup_commands(self) -> None:
|
||||||
|
"""Register instrument-specific command handlers.
|
||||||
|
|
||||||
|
Subclasses must implement this method to register their
|
||||||
|
SCPI command handlers using register_command().
|
||||||
|
"""
|
||||||
|
|
||||||
|
def register_command(self, keyword: str, handler: CommandHandler) -> None:
|
||||||
|
"""Register a handler for a SCPI command keyword.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
keyword: The command keyword (e.g., "TEMP:SETPOINT").
|
||||||
|
For commands that support both set and query forms,
|
||||||
|
register the base keyword without '?'.
|
||||||
|
handler: Callable that takes SCPICommand and returns response string.
|
||||||
|
"""
|
||||||
|
self._handlers[keyword.upper()] = handler
|
||||||
|
|
||||||
|
def process(self, command_string: str) -> str:
|
||||||
|
"""Process a SCPI command string and return the response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command_string: Raw SCPI command string to process.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response string. Empty string for commands with no response.
|
||||||
|
Error string starting with "ERROR:" for invalid commands.
|
||||||
|
"""
|
||||||
|
command = self._parser.parse(command_string)
|
||||||
|
|
||||||
|
if not command.header:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# Look up handler by keyword (without '?' suffix)
|
||||||
|
keyword = command.keyword.upper()
|
||||||
|
handler = self._handlers.get(keyword)
|
||||||
|
|
||||||
|
if handler is None:
|
||||||
|
return f"ERROR: Unknown command '{keyword}'"
|
||||||
|
|
||||||
|
try:
|
||||||
|
return handler(command)
|
||||||
|
except ValueError as e:
|
||||||
|
return f"ERROR: {e}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"ERROR: Internal error - {e}"
|
||||||
|
|
||||||
|
def _handle_idn(self, command: SCPICommand) -> str:
|
||||||
|
"""Handle *IDN? identification query.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Comma-separated identification string.
|
||||||
|
"""
|
||||||
|
if not command.is_query:
|
||||||
|
return "ERROR: *IDN is query only"
|
||||||
|
return f"{self.manufacturer},{self.model},{self.serial_number},{self.firmware_version}"
|
||||||
|
|
||||||
|
def _handle_rst(self, command: SCPICommand) -> str:
|
||||||
|
"""Handle *RST reset command.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Empty string (no response for reset).
|
||||||
|
"""
|
||||||
|
if command.is_query:
|
||||||
|
return "ERROR: *RST is command only"
|
||||||
|
self.reset()
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def _handle_cls(self, command: SCPICommand) -> str:
|
||||||
|
"""Handle *CLS clear status command.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Empty string (no response for clear).
|
||||||
|
"""
|
||||||
|
if command.is_query:
|
||||||
|
return "ERROR: *CLS is command only"
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def _handle_opc(self, command: SCPICommand) -> str:
|
||||||
|
"""Handle *OPC operation complete command/query.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
"1" for query, empty string for command.
|
||||||
|
"""
|
||||||
|
if command.is_query:
|
||||||
|
return "1"
|
||||||
|
return ""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Reset instrument to default state.
|
||||||
|
|
||||||
|
Subclasses must implement this to define reset behaviour.
|
||||||
|
"""
|
||||||
Reference in New Issue
Block a user