From c0155baa4fde916d7977b53db99e06213a8d9dd7 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Mon, 28 Apr 2025 19:24:24 +0000 Subject: [PATCH] Add base instrument class Provides SCPI command parsing and dispatch mechanism for virtual instruments. Includes IEEE 488.2 common commands (*IDN?, *RST, *CLS, *OPC) and abstract methods for instrument-specific implementations. --- src/py_dvt_ate/simulation/virtual/base.py | 156 ++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 src/py_dvt_ate/simulation/virtual/base.py diff --git a/src/py_dvt_ate/simulation/virtual/base.py b/src/py_dvt_ate/simulation/virtual/base.py new file mode 100644 index 0000000..c3e968f --- /dev/null +++ b/src/py_dvt_ate/simulation/virtual/base.py @@ -0,0 +1,156 @@ +"""Base class for virtual instrument simulators. + +This module provides the foundation for implementing SCPI-based virtual +instruments that can be exposed over TCP for hardware abstraction testing. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Callable +from typing import TYPE_CHECKING + +from py_dvt_ate.instruments.scpi import SCPICommand, SCPIParser + +if TYPE_CHECKING: + from py_dvt_ate.simulation.physics.engine import PhysicsEngine + + +# Type alias for command handlers +CommandHandler = Callable[[SCPICommand], str] + + +class BaseInstrument(ABC): + """Abstract base class for virtual SCPI instruments. + + Provides common functionality for SCPI command parsing and dispatch. + Subclasses should register command handlers using the register_command + method or by overriding _setup_commands. + + Attributes: + manufacturer: Instrument manufacturer name for *IDN? response. + model: Instrument model name for *IDN? response. + serial_number: Instrument serial number for *IDN? response. + firmware_version: Firmware version for *IDN? response. + """ + + manufacturer: str = "PyDVTATE" + model: str = "Virtual Instrument" + serial_number: str = "SIM001" + firmware_version: str = "1.0.0" + + def __init__(self, physics_engine: PhysicsEngine | None = None) -> None: + """Initialise the base instrument. + + Args: + physics_engine: Reference to physics engine for simulation state. + May be None for standalone operation. + """ + self._physics_engine = physics_engine + self._parser = SCPIParser() + self._handlers: dict[str, CommandHandler] = {} + self._setup_common_commands() + self._setup_commands() + + def _setup_common_commands(self) -> None: + """Register IEEE 488.2 common commands.""" + self.register_command("*IDN", self._handle_idn) + self.register_command("*RST", self._handle_rst) + self.register_command("*CLS", self._handle_cls) + self.register_command("*OPC", self._handle_opc) + + @abstractmethod + def _setup_commands(self) -> None: + """Register instrument-specific command handlers. + + Subclasses must implement this method to register their + SCPI command handlers using register_command(). + """ + + def register_command(self, keyword: str, handler: CommandHandler) -> None: + """Register a handler for a SCPI command keyword. + + Args: + keyword: The command keyword (e.g., "TEMP:SETPOINT"). + For commands that support both set and query forms, + register the base keyword without '?'. + handler: Callable that takes SCPICommand and returns response string. + """ + self._handlers[keyword.upper()] = handler + + def process(self, command_string: str) -> str: + """Process a SCPI command string and return the response. + + Args: + command_string: Raw SCPI command string to process. + + Returns: + Response string. Empty string for commands with no response. + Error string starting with "ERROR:" for invalid commands. + """ + command = self._parser.parse(command_string) + + if not command.header: + return "" + + # Look up handler by keyword (without '?' suffix) + keyword = command.keyword.upper() + handler = self._handlers.get(keyword) + + if handler is None: + return f"ERROR: Unknown command '{keyword}'" + + try: + return handler(command) + except ValueError as e: + return f"ERROR: {e}" + except Exception as e: + return f"ERROR: Internal error - {e}" + + def _handle_idn(self, command: SCPICommand) -> str: + """Handle *IDN? identification query. + + Returns: + Comma-separated identification string. + """ + if not command.is_query: + return "ERROR: *IDN is query only" + return f"{self.manufacturer},{self.model},{self.serial_number},{self.firmware_version}" + + def _handle_rst(self, command: SCPICommand) -> str: + """Handle *RST reset command. + + Returns: + Empty string (no response for reset). + """ + if command.is_query: + return "ERROR: *RST is command only" + self.reset() + return "" + + def _handle_cls(self, command: SCPICommand) -> str: + """Handle *CLS clear status command. + + Returns: + Empty string (no response for clear). + """ + if command.is_query: + return "ERROR: *CLS is command only" + return "" + + def _handle_opc(self, command: SCPICommand) -> str: + """Handle *OPC operation complete command/query. + + Returns: + "1" for query, empty string for command. + """ + if command.is_query: + return "1" + return "" + + @abstractmethod + def reset(self) -> None: + """Reset instrument to default state. + + Subclasses must implement this to define reset behaviour. + """