Compare commits
18 Commits
v0.1.0-alp
...
v0.1.0-bet
| Author | SHA1 | Date | |
|---|---|---|---|
| afa52e7ee2 | |||
| a951413a62 | |||
| 0b58f7e863 | |||
| a8bd132269 | |||
| 0a8d7e5c69 | |||
| ece1803c10 | |||
| 76d81b21e6 | |||
| 4db50421b3 | |||
| 10e1da198e | |||
| 8fe97047d1 | |||
| 1f00210b63 | |||
| 95961cd26f | |||
| fe208b0c04 | |||
| d38c40d52d | |||
| 936ed5a279 | |||
| 284793df69 | |||
| e38f514153 | |||
| cfe8dab7a8 |
34
CHANGELOG.md
34
CHANGELOG.md
@@ -7,6 +7,38 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.1.0-beta.1] - 2025-12-02
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- Hardware Abstraction Layer (HAL) with instrument interface protocols
|
||||||
|
- IThermalChamber protocol with temperature control methods
|
||||||
|
- IPowerSupply protocol with voltage/current control and measurement
|
||||||
|
- IMultimeter protocol with DC voltage, current, and resistance measurement
|
||||||
|
- Instrument drivers implementing HAL interfaces
|
||||||
|
- ThermalChamberDriver implements IThermalChamber
|
||||||
|
- PowerSupplyDriver implements IPowerSupply
|
||||||
|
- MultimeterDriver implements IMultimeter
|
||||||
|
- Instrument factory pattern for backend abstraction
|
||||||
|
- InstrumentSet dataclass containing chamber, PSU, and DMM
|
||||||
|
- InstrumentConfig for specifying backend (simulator/pyvisa) and connection details
|
||||||
|
- InstrumentFactory.create() for creating instrument sets from configuration
|
||||||
|
- Transport layer abstraction
|
||||||
|
- Transport ABC defining connect/disconnect/read/write/query interface
|
||||||
|
- TCPTransport implementation for TCP/IP connections
|
||||||
|
- Comprehensive test suite for HAL (16 tests)
|
||||||
|
- Interface implementation verification
|
||||||
|
- Factory pattern testing with mocked backends
|
||||||
|
- Configuration validation
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- Drivers now explicitly inherit from interface ABCs for maximum type safety
|
||||||
|
- Moved InstrumentServer to instruments/transport for better architecture
|
||||||
|
|
||||||
|
### Technical
|
||||||
|
- ABC-based interfaces ensure compile-time interface compliance
|
||||||
|
- Factory pattern enables seamless switching between simulated and real hardware
|
||||||
|
- All HAL components fully type-checked with mypy strict mode
|
||||||
|
|
||||||
## [0.1.0-alpha.3] - 2025-12-02
|
## [0.1.0-alpha.3] - 2025-12-02
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
@@ -75,7 +107,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|---------|------|-----------|
|
|---------|------|-----------|
|
||||||
| 0.1.0 | TBD | MVP Complete |
|
| 0.1.0 | TBD | MVP Complete |
|
||||||
| 0.1.0-beta.2 | TBD | First DVT test runs |
|
| 0.1.0-beta.2 | TBD | First DVT test runs |
|
||||||
| 0.1.0-beta.1 | TBD | HAL complete |
|
| 0.1.0-beta.1 | 2025-12-02 | HAL complete |
|
||||||
| 0.1.0-alpha.3 | 2025-12-02 | Network ready |
|
| 0.1.0-alpha.3 | 2025-12-02 | Network ready |
|
||||||
| 0.1.0-alpha.2 | 2025-12-02 | Visual demo |
|
| 0.1.0-alpha.2 | 2025-12-02 | Visual demo |
|
||||||
| 0.1.0-alpha.1 | 2025-12-02 | Physics engine |
|
| 0.1.0-alpha.1 | 2025-12-02 | Physics engine |
|
||||||
|
|||||||
@@ -450,109 +450,123 @@ High-level call → Driver → SCPI command → Transport → Instrument
|
|||||||
```python
|
```python
|
||||||
# py_dvt_ate/instruments/interfaces.py
|
# py_dvt_ate/instruments/interfaces.py
|
||||||
|
|
||||||
from typing import Protocol, runtime_checkable
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class IThermalChamber(Protocol):
|
class IThermalChamber(ABC):
|
||||||
"""Hardware abstraction for thermal chambers."""
|
"""Hardware abstraction for thermal chambers."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def set_temperature(self, setpoint: float) -> None:
|
def set_temperature(self, setpoint: float) -> None:
|
||||||
"""Set target temperature in degrees Celsius."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def get_temperature(self) -> float:
|
def get_temperature(self) -> float:
|
||||||
"""Get current actual temperature in degrees Celsius."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def get_setpoint(self) -> float:
|
def get_setpoint(self) -> float:
|
||||||
"""Get current temperature setpoint."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def is_stable(self) -> bool:
|
def is_stable(self) -> bool:
|
||||||
"""Check if temperature has stabilised at setpoint."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def wait_until_stable(
|
def wait_until_stable(
|
||||||
self,
|
self,
|
||||||
timeout: float = 300.0,
|
timeout: float = 300.0,
|
||||||
poll_interval: float = 1.0
|
poll_interval: float = 1.0
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""
|
"""[docstring]"""
|
||||||
Block until temperature stabilises or timeout.
|
pass
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if stable, False if timeout
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def set_ramp_rate(self, rate: float) -> None:
|
def set_ramp_rate(self, rate: float) -> None:
|
||||||
"""Set temperature ramp rate in degrees C per minute."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class IPowerSupply(Protocol):
|
class IPowerSupply(ABC):
|
||||||
"""Hardware abstraction for programmable power supplies."""
|
"""Hardware abstraction for programmable power supplies."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def set_voltage(self, channel: int, voltage: float) -> None:
|
def set_voltage(self, channel: int, voltage: float) -> None:
|
||||||
"""Set output voltage for specified channel."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def get_voltage(self, channel: int) -> float:
|
def get_voltage(self, channel: int) -> float:
|
||||||
"""Get voltage setpoint for specified channel."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def set_current_limit(self, channel: int, current: float) -> None:
|
def set_current_limit(self, channel: int, current: float) -> None:
|
||||||
"""Set current limit for specified channel."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def get_current_limit(self, channel: int) -> float:
|
def get_current_limit(self, channel: int) -> float:
|
||||||
"""Get current limit for specified channel."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def measure_voltage(self, channel: int) -> float:
|
def measure_voltage(self, channel: int) -> float:
|
||||||
"""Measure actual output voltage."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def measure_current(self, channel: int) -> float:
|
def measure_current(self, channel: int) -> float:
|
||||||
"""Measure actual output current."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def enable_output(self, channel: int, enable: bool) -> None:
|
def enable_output(self, channel: int, enable: bool) -> None:
|
||||||
"""Enable or disable channel output."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def is_output_enabled(self, channel: int) -> bool:
|
def is_output_enabled(self, channel: int) -> bool:
|
||||||
"""Check if channel output is enabled."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class IMultimeter(Protocol):
|
class IMultimeter(ABC):
|
||||||
"""Hardware abstraction for digital multimeters."""
|
"""Hardware abstraction for digital multimeters."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def measure_dc_voltage(self, range: str = "AUTO") -> float:
|
def measure_dc_voltage(self, range: str = "AUTO") -> float:
|
||||||
"""Measure DC voltage. Range: AUTO, 0.1, 1, 10, 100, 1000."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def measure_dc_current(self, range: str = "AUTO") -> float:
|
def measure_dc_current(self, range: str = "AUTO") -> float:
|
||||||
"""Measure DC current."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def measure_resistance(self, range: str = "AUTO") -> float:
|
def measure_resistance(self, range: str = "AUTO") -> float:
|
||||||
"""Measure resistance."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def set_integration_time(self, nplc: float) -> None:
|
def set_integration_time(self, nplc: float) -> None:
|
||||||
"""Set integration time in power line cycles (0.1 to 100)."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class ITestLogger(Protocol):
|
class ITestLogger(ABC):
|
||||||
"""Abstraction for test data logging."""
|
"""Abstraction for test data logging."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def log_measurement(
|
def log_measurement(
|
||||||
self,
|
self,
|
||||||
parameter: str,
|
parameter: str,
|
||||||
@@ -560,9 +574,10 @@ class ITestLogger(Protocol):
|
|||||||
unit: str,
|
unit: str,
|
||||||
conditions: dict[str, float] | None = None
|
conditions: dict[str, float] | None = None
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Log a single measurement."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def log_result(
|
def log_result(
|
||||||
self,
|
self,
|
||||||
parameter: str,
|
parameter: str,
|
||||||
@@ -571,12 +586,13 @@ class ITestLogger(Protocol):
|
|||||||
lower_limit: float | None = None,
|
lower_limit: float | None = None,
|
||||||
upper_limit: float | None = None
|
upper_limit: float | None = None
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Log a test result with optional limits."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def log_event(self, message: str, level: str = "INFO") -> None:
|
def log_event(self, message: str, level: str = "INFO") -> None:
|
||||||
"""Log a test event or message."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
```
|
```
|
||||||
|
|
||||||
### 4.2 Transport Interface
|
### 4.2 Transport Interface
|
||||||
@@ -584,36 +600,42 @@ class ITestLogger(Protocol):
|
|||||||
```python
|
```python
|
||||||
# py_dvt_ate/instruments/transport/base.py
|
# py_dvt_ate/instruments/transport/base.py
|
||||||
|
|
||||||
from typing import Protocol
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
class Transport(Protocol):
|
class Transport(ABC):
|
||||||
"""Abstract transport interface for instrument communication."""
|
"""Abstract transport interface for instrument communication."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def connect(self) -> None:
|
def connect(self) -> None:
|
||||||
"""Establish connection to instrument."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def disconnect(self) -> None:
|
def disconnect(self) -> None:
|
||||||
"""Close connection to instrument."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def write(self, command: str) -> None:
|
def write(self, command: str) -> None:
|
||||||
"""Send command to instrument."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def read(self, timeout: float | None = None) -> str:
|
def read(self, timeout: float | None = None) -> str:
|
||||||
"""Read response from instrument."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def query(self, command: str, timeout: float | None = None) -> str:
|
def query(self, command: str, timeout: float | None = None) -> str:
|
||||||
"""Send command and read response."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@abstractmethod
|
||||||
def is_connected(self) -> bool:
|
def is_connected(self) -> bool:
|
||||||
"""Check if connection is active."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
```
|
```
|
||||||
|
|
||||||
### 4.3 Test Interface
|
### 4.3 Test Interface
|
||||||
@@ -624,7 +646,7 @@ class Transport(Protocol):
|
|||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Protocol
|
from abc import ABC, abstractmethod
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
|
|
||||||
@@ -675,22 +697,25 @@ class TestContext:
|
|||||||
config: dict
|
config: dict
|
||||||
|
|
||||||
|
|
||||||
class ITest(Protocol):
|
class ITest(ABC):
|
||||||
"""Interface for test implementations."""
|
"""Interface for test implementations."""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@abstractmethod
|
||||||
def name(self) -> str:
|
def name(self) -> str:
|
||||||
"""Test name identifier."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@abstractmethod
|
||||||
def description(self) -> str:
|
def description(self) -> str:
|
||||||
"""Human-readable test description."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def execute(self, context: TestContext) -> TestStatus:
|
def execute(self, context: TestContext) -> TestStatus:
|
||||||
"""Execute the test, return status."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
```
|
```
|
||||||
|
|
||||||
### 4.4 Factory Interface
|
### 4.4 Factory Interface
|
||||||
@@ -1173,30 +1198,34 @@ Schema:
|
|||||||
```python
|
```python
|
||||||
# py_dvt_ate/data/repository.py (interface)
|
# py_dvt_ate/data/repository.py (interface)
|
||||||
|
|
||||||
from typing import Protocol
|
from abc import ABC, abstractmethod
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
|
|
||||||
class ITestRepository(Protocol):
|
class ITestRepository(ABC):
|
||||||
"""Repository interface for test data."""
|
"""Repository interface for test data."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def create_run(
|
def create_run(
|
||||||
self,
|
self,
|
||||||
test_name: str,
|
test_name: str,
|
||||||
config: dict,
|
config: dict,
|
||||||
operator: str | None = None
|
operator: str | None = None
|
||||||
) -> UUID:
|
) -> UUID:
|
||||||
"""Create a new test run, return its ID."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def update_run_status(self, run_id: UUID, status: str) -> None:
|
def update_run_status(self, run_id: UUID, status: str) -> None:
|
||||||
"""Update test run status."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def complete_run(self, run_id: UUID, status: str) -> None:
|
def complete_run(self, run_id: UUID, status: str) -> None:
|
||||||
"""Mark test run as complete with final status."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def save_result(
|
def save_result(
|
||||||
self,
|
self,
|
||||||
run_id: UUID,
|
run_id: UUID,
|
||||||
@@ -1206,28 +1235,32 @@ class ITestRepository(Protocol):
|
|||||||
lower_limit: float | None = None,
|
lower_limit: float | None = None,
|
||||||
upper_limit: float | None = None
|
upper_limit: float | None = None
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Save a test result."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def save_measurements(
|
def save_measurements(
|
||||||
self,
|
self,
|
||||||
run_id: UUID,
|
run_id: UUID,
|
||||||
measurements: list["Measurement"]
|
measurements: list["Measurement"]
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Save batch of measurements to Parquet."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def get_run(self, run_id: UUID) -> "TestRun":
|
def get_run(self, run_id: UUID) -> "TestRun":
|
||||||
"""Get test run by ID."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def get_results(self, run_id: UUID) -> list["TestResult"]:
|
def get_results(self, run_id: UUID) -> list["TestResult"]:
|
||||||
"""Get all results for a test run."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def get_measurements_dataframe(self, run_id: UUID):
|
def get_measurements_dataframe(self, run_id: UUID):
|
||||||
"""Get measurements as pandas DataFrame."""
|
"""[docstring]"""
|
||||||
...
|
pass
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
"""py_dvt_ate: Coupled Physics DVT Simulation Platform."""
|
"""py_dvt_ate: Coupled Physics DVT Simulation Platform."""
|
||||||
|
|
||||||
__version__ = "0.1.0-alpha.3"
|
__version__ = "0.1.0-beta.1"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
"""Command-line interface for py_dvt_ate."""
|
"""Command-line interface for py_dvt_ate."""
|
||||||
|
|
||||||
from typing import Annotated, Optional
|
from typing import Annotated
|
||||||
|
|
||||||
import typer
|
import typer
|
||||||
|
|
||||||
@@ -23,7 +23,7 @@ def version_callback(value: bool) -> None:
|
|||||||
@app.callback()
|
@app.callback()
|
||||||
def main(
|
def main(
|
||||||
version: Annotated[
|
version: Annotated[
|
||||||
Optional[bool],
|
bool | None,
|
||||||
typer.Option(
|
typer.Option(
|
||||||
"--version",
|
"--version",
|
||||||
"-v",
|
"-v",
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import streamlit as st
|
|||||||
|
|
||||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||||
|
|
||||||
|
|
||||||
# History buffer size for charts
|
# History buffer size for charts
|
||||||
HISTORY_SIZE = 500
|
HISTORY_SIZE = 500
|
||||||
|
|
||||||
|
|||||||
@@ -7,3 +7,23 @@ This package provides everything needed to communicate with lab instruments:
|
|||||||
- Instrument drivers
|
- Instrument drivers
|
||||||
- Factory for creating configured instrument sets
|
- Factory for creating configured instrument sets
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.factory import (
|
||||||
|
InstrumentConfig,
|
||||||
|
InstrumentFactory,
|
||||||
|
InstrumentSet,
|
||||||
|
)
|
||||||
|
from py_dvt_ate.instruments.interfaces import (
|
||||||
|
IMultimeter,
|
||||||
|
IPowerSupply,
|
||||||
|
IThermalChamber,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"IThermalChamber",
|
||||||
|
"IPowerSupply",
|
||||||
|
"IMultimeter",
|
||||||
|
"InstrumentSet",
|
||||||
|
"InstrumentConfig",
|
||||||
|
"InstrumentFactory",
|
||||||
|
]
|
||||||
|
|||||||
@@ -3,3 +3,15 @@
|
|||||||
Each driver translates high-level operations into SCPI commands
|
Each driver translates high-level operations into SCPI commands
|
||||||
and handles responses from instruments.
|
and handles responses from instruments.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.drivers.base import BaseDriver
|
||||||
|
from py_dvt_ate.instruments.drivers.chamber import ThermalChamberDriver
|
||||||
|
from py_dvt_ate.instruments.drivers.multimeter import MultimeterDriver
|
||||||
|
from py_dvt_ate.instruments.drivers.power_supply import PowerSupplyDriver
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"BaseDriver",
|
||||||
|
"ThermalChamberDriver",
|
||||||
|
"PowerSupplyDriver",
|
||||||
|
"MultimeterDriver",
|
||||||
|
]
|
||||||
|
|||||||
197
src/py_dvt_ate/instruments/drivers/base.py
Normal file
197
src/py_dvt_ate/instruments/drivers/base.py
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
"""Base class for SCPI instrument drivers.
|
||||||
|
|
||||||
|
This module provides the foundation for implementing client-side instrument
|
||||||
|
drivers that communicate via SCPI commands over a transport layer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from py_dvt_ate.instruments.transport.base import Transport
|
||||||
|
|
||||||
|
|
||||||
|
class BaseDriver:
|
||||||
|
"""Base class for SCPI instrument drivers.
|
||||||
|
|
||||||
|
Provides common functionality for communicating with instruments via
|
||||||
|
SCPI commands. Subclasses implement instrument-specific command methods.
|
||||||
|
|
||||||
|
All drivers depend on a Transport instance for low-level communication.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
transport: The transport layer for communication.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, transport: "Transport") -> None:
|
||||||
|
"""Initialise the driver with a transport layer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
transport: Transport instance for communication (TCP, VISA, etc.).
|
||||||
|
"""
|
||||||
|
self.transport = transport
|
||||||
|
|
||||||
|
def connect(self) -> None:
|
||||||
|
"""Establish connection to the instrument.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If connection fails.
|
||||||
|
"""
|
||||||
|
self.transport.connect()
|
||||||
|
|
||||||
|
def disconnect(self) -> None:
|
||||||
|
"""Close connection to the instrument.
|
||||||
|
|
||||||
|
Safe to call multiple times (idempotent).
|
||||||
|
"""
|
||||||
|
self.transport.disconnect()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_connected(self) -> bool:
|
||||||
|
"""Check if connection is active.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if connected, False otherwise.
|
||||||
|
"""
|
||||||
|
return self.transport.is_connected
|
||||||
|
|
||||||
|
def write(self, command: str) -> None:
|
||||||
|
"""Send a SCPI command to the instrument.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI command string (without terminator).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If write fails.
|
||||||
|
"""
|
||||||
|
self.transport.write(command)
|
||||||
|
|
||||||
|
def query(self, command: str, timeout: float | None = None) -> str:
|
||||||
|
"""Send a SCPI query and read the response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI query string (without terminator).
|
||||||
|
timeout: Read timeout in seconds. None uses default.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response string from instrument.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
return self.transport.query(command, timeout)
|
||||||
|
|
||||||
|
def query_float(self, command: str, timeout: float | None = None) -> float:
|
||||||
|
"""Send a SCPI query and parse response as float.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI query string.
|
||||||
|
timeout: Read timeout in seconds.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parsed floating-point value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
ValueError: If response cannot be parsed as float.
|
||||||
|
"""
|
||||||
|
response = self.query(command, timeout)
|
||||||
|
try:
|
||||||
|
return float(response.strip())
|
||||||
|
except ValueError as err:
|
||||||
|
raise ValueError(f"Cannot parse '{response}' as float") from err
|
||||||
|
|
||||||
|
def query_int(self, command: str, timeout: float | None = None) -> int:
|
||||||
|
"""Send a SCPI query and parse response as integer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI query string.
|
||||||
|
timeout: Read timeout in seconds.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parsed integer value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
ValueError: If response cannot be parsed as integer.
|
||||||
|
"""
|
||||||
|
response = self.query(command, timeout)
|
||||||
|
try:
|
||||||
|
return int(response.strip())
|
||||||
|
except ValueError as err:
|
||||||
|
raise ValueError(f"Cannot parse '{response}' as int") from err
|
||||||
|
|
||||||
|
def query_bool(self, command: str, timeout: float | None = None) -> bool:
|
||||||
|
"""Send a SCPI query and parse response as boolean.
|
||||||
|
|
||||||
|
Interprets "1", "ON", "TRUE" as True; "0", "OFF", "FALSE" as False.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI query string.
|
||||||
|
timeout: Read timeout in seconds.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parsed boolean value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
ValueError: If response cannot be parsed as boolean.
|
||||||
|
"""
|
||||||
|
response = self.query(command, timeout).strip().upper()
|
||||||
|
if response in ("1", "ON", "TRUE"):
|
||||||
|
return True
|
||||||
|
if response in ("0", "OFF", "FALSE"):
|
||||||
|
return False
|
||||||
|
raise ValueError(f"Cannot parse '{response}' as bool")
|
||||||
|
|
||||||
|
def identify(self) -> str:
|
||||||
|
"""Query instrument identification (*IDN?).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Identification string in format:
|
||||||
|
"Manufacturer,Model,SerialNumber,FirmwareVersion"
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
return self.query("*IDN?")
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Reset instrument to default state (*RST).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
self.write("*RST")
|
||||||
|
|
||||||
|
def clear_status(self) -> None:
|
||||||
|
"""Clear instrument status (*CLS).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
self.write("*CLS")
|
||||||
|
|
||||||
|
def operation_complete(self) -> bool:
|
||||||
|
"""Query operation complete status (*OPC?).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if operation complete.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
response = self.query("*OPC?")
|
||||||
|
return response.strip() == "1"
|
||||||
142
src/py_dvt_ate/instruments/drivers/chamber.py
Normal file
142
src/py_dvt_ate/instruments/drivers/chamber.py
Normal file
@@ -0,0 +1,142 @@
|
|||||||
|
"""Thermal chamber SCPI driver.
|
||||||
|
|
||||||
|
This module implements a client-side driver for thermal chambers that
|
||||||
|
communicate via SCPI commands.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.drivers.base import BaseDriver
|
||||||
|
from py_dvt_ate.instruments.interfaces import IThermalChamber
|
||||||
|
|
||||||
|
|
||||||
|
class ThermalChamberDriver(BaseDriver, IThermalChamber):
|
||||||
|
"""SCPI driver for thermal chambers.
|
||||||
|
|
||||||
|
Provides high-level Python API for controlling thermal chambers via
|
||||||
|
SCPI commands. Implements the IThermalChamber interface.
|
||||||
|
|
||||||
|
SCPI Commands Used:
|
||||||
|
TEMP:SETPOINT <value> - Set target temperature (°C)
|
||||||
|
TEMP:SETPOINT? - Query current setpoint
|
||||||
|
TEMP:ACTUAL? - Query actual chamber temperature
|
||||||
|
TEMP:STAB? - Query stability (1=stable, 0=settling)
|
||||||
|
TEMP:RAMP <rate> - Set temperature ramp rate (°C/min)
|
||||||
|
TEMP:RAMP? - Query ramp rate
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> transport = TCPTransport("localhost", 5001)
|
||||||
|
>>> chamber = ThermalChamberDriver(transport)
|
||||||
|
>>> chamber.connect()
|
||||||
|
>>> chamber.set_temperature(85.0)
|
||||||
|
>>> chamber.wait_until_stable(timeout=600.0)
|
||||||
|
>>> temp = chamber.get_temperature()
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_temperature(self, setpoint: float) -> None:
|
||||||
|
"""Set the chamber temperature setpoint.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
setpoint: Target temperature in degrees Celsius.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If command fails.
|
||||||
|
"""
|
||||||
|
self.write(f"TEMP:SETPOINT {setpoint:.2f}")
|
||||||
|
|
||||||
|
def get_temperature(self) -> float:
|
||||||
|
"""Get the actual chamber temperature.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current chamber temperature in degrees Celsius.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_float("TEMP:ACTUAL?")
|
||||||
|
|
||||||
|
def get_setpoint(self) -> float:
|
||||||
|
"""Get the current temperature setpoint.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current setpoint in degrees Celsius.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_float("TEMP:SETPOINT?")
|
||||||
|
|
||||||
|
def is_stable(self) -> bool:
|
||||||
|
"""Check if chamber temperature is stable.
|
||||||
|
|
||||||
|
Temperature is considered stable when it has settled within
|
||||||
|
the instrument's configured stability threshold of the setpoint.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if temperature is stable, False if still settling.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_bool("TEMP:STAB?")
|
||||||
|
|
||||||
|
def wait_until_stable(
|
||||||
|
self, timeout: float = 300.0, poll_interval: float = 1.0
|
||||||
|
) -> bool:
|
||||||
|
"""Wait until chamber temperature stabilises.
|
||||||
|
|
||||||
|
Polls the stability status at regular intervals until stable
|
||||||
|
or timeout is reached.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: Maximum time to wait in seconds. Default 300s (5 minutes).
|
||||||
|
poll_interval: Time between stability checks in seconds. Default 1s.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if temperature stabilised within timeout, False if timed out.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If communication fails.
|
||||||
|
ValueError: If timeout or poll_interval are negative.
|
||||||
|
"""
|
||||||
|
if timeout < 0:
|
||||||
|
raise ValueError("Timeout must be non-negative")
|
||||||
|
if poll_interval <= 0:
|
||||||
|
raise ValueError("Poll interval must be positive")
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
while time.time() - start_time < timeout:
|
||||||
|
if self.is_stable():
|
||||||
|
return True
|
||||||
|
time.sleep(poll_interval)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def set_ramp_rate(self, rate: float) -> None:
|
||||||
|
"""Set the temperature ramp rate.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rate: Ramp rate in degrees Celsius per minute.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If command fails.
|
||||||
|
"""
|
||||||
|
self.write(f"TEMP:RAMP {rate:.2f}")
|
||||||
|
|
||||||
|
def get_ramp_rate(self) -> float:
|
||||||
|
"""Get the current temperature ramp rate.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Ramp rate in degrees Celsius per minute.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_float("TEMP:RAMP?")
|
||||||
158
src/py_dvt_ate/instruments/drivers/multimeter.py
Normal file
158
src/py_dvt_ate/instruments/drivers/multimeter.py
Normal file
@@ -0,0 +1,158 @@
|
|||||||
|
"""Multimeter SCPI driver.
|
||||||
|
|
||||||
|
This module implements a client-side driver for digital multimeters
|
||||||
|
that communicate via SCPI commands.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.drivers.base import BaseDriver
|
||||||
|
from py_dvt_ate.instruments.interfaces import IMultimeter
|
||||||
|
|
||||||
|
|
||||||
|
class MultimeterDriver(BaseDriver, IMultimeter):
|
||||||
|
"""SCPI driver for digital multimeters.
|
||||||
|
|
||||||
|
Provides high-level Python API for making measurements with DMMs via
|
||||||
|
SCPI commands. Implements the IMultimeter interface.
|
||||||
|
|
||||||
|
SCPI Commands Used:
|
||||||
|
MEAS:VOLT:DC? - Measure DC voltage
|
||||||
|
MEAS:CURR:DC? - Measure DC current
|
||||||
|
CONF:VOLT:DC - Configure for DC voltage measurement
|
||||||
|
CONF:CURR:DC - Configure for DC current measurement
|
||||||
|
CONF? - Query current configuration
|
||||||
|
READ? - Take measurement with current configuration
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> transport = TCPTransport("localhost", 5003)
|
||||||
|
>>> dmm = MultimeterDriver(transport)
|
||||||
|
>>> dmm.connect()
|
||||||
|
>>> voltage = dmm.measure_dc_voltage()
|
||||||
|
>>> current = dmm.measure_dc_current()
|
||||||
|
"""
|
||||||
|
|
||||||
|
def measure_dc_voltage(self, range: str = "AUTO") -> float:
|
||||||
|
"""Measure DC voltage.
|
||||||
|
|
||||||
|
Configures the meter for DC voltage and takes a measurement.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
range: Measurement range. Default "AUTO" for auto-ranging.
|
||||||
|
Note: Range parameter currently not supported by simulator.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured voltage in volts.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
# Note: Range parameter not yet implemented in virtual instrument
|
||||||
|
return self.query_float("MEAS:VOLT:DC?")
|
||||||
|
|
||||||
|
def measure_dc_current(self, range: str = "AUTO") -> float:
|
||||||
|
"""Measure DC current.
|
||||||
|
|
||||||
|
Configures the meter for DC current and takes a measurement.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
range: Measurement range. Default "AUTO" for auto-ranging.
|
||||||
|
Note: Range parameter currently not supported by simulator.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured current in amps.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
# Note: Range parameter not yet implemented in virtual instrument
|
||||||
|
return self.query_float("MEAS:CURR:DC?")
|
||||||
|
|
||||||
|
def measure_resistance(self, range: str = "AUTO") -> float:
|
||||||
|
"""Measure resistance.
|
||||||
|
|
||||||
|
Configures the meter for resistance and takes a measurement.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
range: Measurement range. Default "AUTO" for auto-ranging.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured resistance in ohms.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
NotImplementedError: If instrument does not support resistance.
|
||||||
|
"""
|
||||||
|
# Note: Resistance measurement not yet implemented in virtual instrument
|
||||||
|
raise NotImplementedError(
|
||||||
|
"Resistance measurement not yet supported by virtual instrument"
|
||||||
|
)
|
||||||
|
|
||||||
|
def set_integration_time(self, nplc: float) -> None:
|
||||||
|
"""Set the integration time.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
nplc: Integration time in number of power line cycles (NPLC).
|
||||||
|
Typical values: 0.02, 0.2, 1, 10, 100.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If command fails.
|
||||||
|
NotImplementedError: If instrument does not support integration time.
|
||||||
|
"""
|
||||||
|
# Note: Integration time not yet implemented in virtual instrument
|
||||||
|
raise NotImplementedError(
|
||||||
|
"Integration time setting not yet supported by virtual instrument"
|
||||||
|
)
|
||||||
|
|
||||||
|
def configure_dc_voltage(self) -> None:
|
||||||
|
"""Configure meter for DC voltage measurement.
|
||||||
|
|
||||||
|
Sets the measurement function without taking a measurement.
|
||||||
|
Use read() to take a measurement after configuring.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If command fails.
|
||||||
|
"""
|
||||||
|
self.write("CONF:VOLT:DC")
|
||||||
|
|
||||||
|
def configure_dc_current(self) -> None:
|
||||||
|
"""Configure meter for DC current measurement.
|
||||||
|
|
||||||
|
Sets the measurement function without taking a measurement.
|
||||||
|
Use read() to take a measurement after configuring.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If command fails.
|
||||||
|
"""
|
||||||
|
self.write("CONF:CURR:DC")
|
||||||
|
|
||||||
|
def get_configuration(self) -> str:
|
||||||
|
"""Get the current measurement configuration.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configuration string (e.g., "VOLT:DC", "CURR:DC").
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query("CONF?").strip('"')
|
||||||
|
|
||||||
|
def read(self) -> float:
|
||||||
|
"""Take a measurement using the current configuration.
|
||||||
|
|
||||||
|
Must call configure_dc_voltage() or configure_dc_current() first
|
||||||
|
to set the measurement function.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured value (voltage in V or current in A).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_float("READ?")
|
||||||
153
src/py_dvt_ate/instruments/drivers/power_supply.py
Normal file
153
src/py_dvt_ate/instruments/drivers/power_supply.py
Normal file
@@ -0,0 +1,153 @@
|
|||||||
|
"""Power supply SCPI driver.
|
||||||
|
|
||||||
|
This module implements a client-side driver for programmable power supplies
|
||||||
|
that communicate via SCPI commands.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.drivers.base import BaseDriver
|
||||||
|
from py_dvt_ate.instruments.interfaces import IPowerSupply
|
||||||
|
|
||||||
|
|
||||||
|
class PowerSupplyDriver(BaseDriver, IPowerSupply):
|
||||||
|
"""SCPI driver for programmable power supplies.
|
||||||
|
|
||||||
|
Provides high-level Python API for controlling power supplies via
|
||||||
|
SCPI commands. Implements the IPowerSupply interface.
|
||||||
|
|
||||||
|
Note: This driver assumes a single-channel instrument. The channel
|
||||||
|
parameter is accepted for interface compatibility but currently ignored.
|
||||||
|
|
||||||
|
SCPI Commands Used:
|
||||||
|
VOLT <value> - Set output voltage (V)
|
||||||
|
VOLT? - Query voltage setpoint
|
||||||
|
CURR <value> - Set current limit (A)
|
||||||
|
CURR? - Query current limit
|
||||||
|
OUTP <ON|OFF|1|0> - Enable/disable output
|
||||||
|
OUTP? - Query output state (1=on, 0=off)
|
||||||
|
MEAS:VOLT? - Measure actual output voltage
|
||||||
|
MEAS:CURR? - Measure actual output current
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> transport = TCPTransport("localhost", 5002)
|
||||||
|
>>> psu = PowerSupplyDriver(transport)
|
||||||
|
>>> psu.connect()
|
||||||
|
>>> psu.set_voltage(1, 3.3)
|
||||||
|
>>> psu.set_current_limit(1, 0.5)
|
||||||
|
>>> psu.enable_output(1, True)
|
||||||
|
>>> voltage = psu.measure_voltage(1)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_voltage(self, channel: int, voltage: float) -> None:
|
||||||
|
"""Set the output voltage setpoint.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Channel number (currently ignored, single channel assumed).
|
||||||
|
voltage: Target voltage in volts.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If command fails.
|
||||||
|
"""
|
||||||
|
self.write(f"VOLT {voltage:.3f}")
|
||||||
|
|
||||||
|
def get_voltage(self, channel: int) -> float:
|
||||||
|
"""Get the voltage setpoint.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Channel number (currently ignored, single channel assumed).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current voltage setpoint in volts.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_float("VOLT?")
|
||||||
|
|
||||||
|
def set_current_limit(self, channel: int, current: float) -> None:
|
||||||
|
"""Set the current limit.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Channel number (currently ignored, single channel assumed).
|
||||||
|
current: Current limit in amps.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If command fails.
|
||||||
|
"""
|
||||||
|
self.write(f"CURR {current:.3f}")
|
||||||
|
|
||||||
|
def get_current_limit(self, channel: int) -> float:
|
||||||
|
"""Get the current limit.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Channel number (currently ignored, single channel assumed).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current limit in amps.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_float("CURR?")
|
||||||
|
|
||||||
|
def measure_voltage(self, channel: int) -> float:
|
||||||
|
"""Measure the actual output voltage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Channel number (currently ignored, single channel assumed).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured voltage in volts.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_float("MEAS:VOLT?")
|
||||||
|
|
||||||
|
def measure_current(self, channel: int) -> float:
|
||||||
|
"""Measure the actual output current.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Channel number (currently ignored, single channel assumed).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured current in amps.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_float("MEAS:CURR?")
|
||||||
|
|
||||||
|
def enable_output(self, channel: int, enable: bool) -> None:
|
||||||
|
"""Enable or disable the output.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Channel number (currently ignored, single channel assumed).
|
||||||
|
enable: True to enable output, False to disable.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If command fails.
|
||||||
|
"""
|
||||||
|
state = "ON" if enable else "OFF"
|
||||||
|
self.write(f"OUTP {state}")
|
||||||
|
|
||||||
|
def is_output_enabled(self, channel: int) -> bool:
|
||||||
|
"""Check if output is enabled.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Channel number (currently ignored, single channel assumed).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if output is enabled, False if disabled.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If query fails.
|
||||||
|
"""
|
||||||
|
return self.query_bool("OUTP?")
|
||||||
176
src/py_dvt_ate/instruments/factory.py
Normal file
176
src/py_dvt_ate/instruments/factory.py
Normal file
@@ -0,0 +1,176 @@
|
|||||||
|
"""Instrument factory for creating configured instrument sets.
|
||||||
|
|
||||||
|
This module provides a factory pattern for creating sets of instruments
|
||||||
|
based on configuration. It abstracts away the choice between simulated
|
||||||
|
and real hardware, allowing test code to be written once and run against
|
||||||
|
either backend.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.interfaces import IMultimeter, IPowerSupply, IThermalChamber
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class InstrumentSet:
|
||||||
|
"""Container for a complete set of instruments.
|
||||||
|
|
||||||
|
Holds all instruments needed for DVT testing. All instruments implement
|
||||||
|
the interface protocols (IThermalChamber, IPowerSupply, IMultimeter),
|
||||||
|
allowing them to be simulated or real hardware.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
chamber: Thermal chamber for temperature control.
|
||||||
|
psu: Programmable power supply for DUT power.
|
||||||
|
dmm: Digital multimeter for precision measurements.
|
||||||
|
"""
|
||||||
|
|
||||||
|
chamber: IThermalChamber
|
||||||
|
psu: IPowerSupply
|
||||||
|
dmm: IMultimeter
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class InstrumentConfig:
|
||||||
|
"""Configuration for instrument connections.
|
||||||
|
|
||||||
|
Defines how to connect to instruments. The backend determines whether
|
||||||
|
to use simulated instruments (TCP connections to virtual instruments)
|
||||||
|
or real hardware (PyVISA connections).
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
backend: "simulator" for virtual instruments, "pyvisa" for real hardware.
|
||||||
|
|
||||||
|
Simulator Settings:
|
||||||
|
simulator_host: Hostname/IP of simulation server. Default "localhost".
|
||||||
|
chamber_port: TCP port for thermal chamber simulator. Default 5001.
|
||||||
|
psu_port: TCP port for power supply simulator. Default 5002.
|
||||||
|
dmm_port: TCP port for multimeter simulator. Default 5003.
|
||||||
|
|
||||||
|
PyVISA Settings (for real hardware):
|
||||||
|
chamber_visa: VISA resource string for thermal chamber (e.g., "TCPIP::192.168.1.10::INSTR").
|
||||||
|
psu_visa: VISA resource string for power supply.
|
||||||
|
dmm_visa: VISA resource string for multimeter.
|
||||||
|
"""
|
||||||
|
|
||||||
|
backend: Literal["simulator", "pyvisa"]
|
||||||
|
|
||||||
|
# Simulator settings
|
||||||
|
simulator_host: str = "localhost"
|
||||||
|
chamber_port: int = 5001
|
||||||
|
psu_port: int = 5002
|
||||||
|
dmm_port: int = 5003
|
||||||
|
|
||||||
|
# PyVISA settings (for real hardware)
|
||||||
|
chamber_visa: str | None = None
|
||||||
|
psu_visa: str | None = None
|
||||||
|
dmm_visa: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class InstrumentFactory:
|
||||||
|
"""Factory for creating instrument sets from configuration.
|
||||||
|
|
||||||
|
This factory encapsulates the creation logic for instrument sets,
|
||||||
|
hiding the complexity of instantiating transports and drivers based
|
||||||
|
on the chosen backend.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> config = InstrumentConfig(backend="simulator")
|
||||||
|
>>> instruments = InstrumentFactory.create(config)
|
||||||
|
>>> instruments.chamber.set_temperature(85.0)
|
||||||
|
>>> instruments.psu.set_voltage(1, 3.3)
|
||||||
|
>>> voltage = instruments.dmm.measure_dc_voltage()
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(config: InstrumentConfig) -> InstrumentSet:
|
||||||
|
"""Create instrument set based on configuration.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: Configuration specifying backend and connection details.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
InstrumentSet containing all configured instruments.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If backend is unknown or configuration is invalid.
|
||||||
|
ConnectionError: If unable to connect to instruments.
|
||||||
|
"""
|
||||||
|
if config.backend == "simulator":
|
||||||
|
return InstrumentFactory._create_simulated(config)
|
||||||
|
elif config.backend == "pyvisa":
|
||||||
|
return InstrumentFactory._create_pyvisa(config)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unknown backend: {config.backend}")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _create_simulated(config: InstrumentConfig) -> InstrumentSet:
|
||||||
|
"""Create simulated instruments connected via TCP.
|
||||||
|
|
||||||
|
Creates TCP transports for each virtual instrument and wraps them
|
||||||
|
in SCPI drivers. The simulation server must be running and listening
|
||||||
|
on the configured ports.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: Configuration with simulator_host and port settings.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
InstrumentSet with simulated instruments.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If unable to connect to simulation server.
|
||||||
|
"""
|
||||||
|
from py_dvt_ate.instruments.drivers.chamber import ThermalChamberDriver
|
||||||
|
from py_dvt_ate.instruments.drivers.multimeter import MultimeterDriver
|
||||||
|
from py_dvt_ate.instruments.drivers.power_supply import PowerSupplyDriver
|
||||||
|
from py_dvt_ate.instruments.transport.tcp import TCPTransport
|
||||||
|
|
||||||
|
# Create transports for each instrument
|
||||||
|
chamber_transport = TCPTransport(config.simulator_host, config.chamber_port)
|
||||||
|
psu_transport = TCPTransport(config.simulator_host, config.psu_port)
|
||||||
|
dmm_transport = TCPTransport(config.simulator_host, config.dmm_port)
|
||||||
|
|
||||||
|
# Wrap transports in drivers
|
||||||
|
return InstrumentSet(
|
||||||
|
chamber=ThermalChamberDriver(chamber_transport),
|
||||||
|
psu=PowerSupplyDriver(psu_transport),
|
||||||
|
dmm=MultimeterDriver(dmm_transport),
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _create_pyvisa(config: InstrumentConfig) -> InstrumentSet:
|
||||||
|
"""Create PyVISA instruments for real hardware.
|
||||||
|
|
||||||
|
Creates VISA transports for each real instrument and wraps them
|
||||||
|
in SCPI drivers. Requires PyVISA to be installed and VISA resource
|
||||||
|
strings to be configured.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: Configuration with chamber_visa, psu_visa, dmm_visa settings.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
InstrumentSet with real hardware instruments.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
NotImplementedError: PyVISA backend not yet implemented.
|
||||||
|
ValueError: If required VISA resource strings are missing.
|
||||||
|
"""
|
||||||
|
# Future implementation would use pyvisa.ResourceManager
|
||||||
|
# to create VISA transports:
|
||||||
|
#
|
||||||
|
# import pyvisa
|
||||||
|
# from py_dvt_ate.instruments.transport.visa import VISATransport
|
||||||
|
#
|
||||||
|
# rm = pyvisa.ResourceManager()
|
||||||
|
# chamber_transport = VISATransport(rm.open_resource(config.chamber_visa))
|
||||||
|
# psu_transport = VISATransport(rm.open_resource(config.psu_visa))
|
||||||
|
# dmm_transport = VISATransport(rm.open_resource(config.dmm_visa))
|
||||||
|
#
|
||||||
|
# return InstrumentSet(
|
||||||
|
# chamber=ThermalChamberDriver(chamber_transport),
|
||||||
|
# psu=PowerSupplyDriver(psu_transport),
|
||||||
|
# dmm=MultimeterDriver(dmm_transport),
|
||||||
|
# )
|
||||||
|
|
||||||
|
raise NotImplementedError("PyVISA backend not yet implemented")
|
||||||
362
src/py_dvt_ate/instruments/interfaces.py
Normal file
362
src/py_dvt_ate/instruments/interfaces.py
Normal file
@@ -0,0 +1,362 @@
|
|||||||
|
"""Instrument interface protocols.
|
||||||
|
|
||||||
|
This module defines the Hardware Abstraction Layer (HAL) interfaces for all
|
||||||
|
laboratory instruments used in DVT testing. These protocols allow test code
|
||||||
|
to be written against abstract interfaces rather than concrete implementations,
|
||||||
|
enabling seamless switching between simulated and real hardware.
|
||||||
|
|
||||||
|
The interfaces use ABC (Abstract Base Classes) for maximum type safety and
|
||||||
|
explicit interface implementation. All drivers must inherit from these base
|
||||||
|
classes and implement all abstract methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
|
class IThermalChamber(ABC):
|
||||||
|
"""Hardware abstraction for thermal chambers.
|
||||||
|
|
||||||
|
Defines the interface for controlling environmental temperature during
|
||||||
|
thermal characterisation tests. Implementations may be virtual instruments
|
||||||
|
(simulators) or real hardware drivers.
|
||||||
|
|
||||||
|
Temperature units are always degrees Celsius.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def set_temperature(self, setpoint: float) -> None:
|
||||||
|
"""Set the chamber temperature setpoint.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
setpoint: Target temperature in degrees Celsius.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If command fails or instrument reports error.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_temperature(self) -> float:
|
||||||
|
"""Get the actual chamber temperature.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current chamber air temperature in degrees Celsius.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_setpoint(self) -> float:
|
||||||
|
"""Get the current temperature setpoint.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current target temperature in degrees Celsius.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def is_stable(self) -> bool:
|
||||||
|
"""Check if chamber temperature is stable.
|
||||||
|
|
||||||
|
Temperature is considered stable when it has settled within
|
||||||
|
the instrument's configured stability threshold of the setpoint
|
||||||
|
for a minimum dwell time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if temperature is stable, False if still settling.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def wait_until_stable(
|
||||||
|
self, timeout: float = 300.0, poll_interval: float = 1.0
|
||||||
|
) -> bool:
|
||||||
|
"""Wait until chamber temperature stabilises.
|
||||||
|
|
||||||
|
Polls the stability status at regular intervals until stable
|
||||||
|
or timeout is reached. This is a blocking call.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: Maximum time to wait in seconds. Default 300s (5 minutes).
|
||||||
|
poll_interval: Time between stability checks in seconds. Default 1s.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if temperature stabilised within timeout, False if timed out.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If communication fails.
|
||||||
|
ValueError: If timeout or poll_interval are invalid.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def set_ramp_rate(self, rate: float) -> None:
|
||||||
|
"""Set the temperature ramp rate.
|
||||||
|
|
||||||
|
Controls how quickly the chamber changes temperature when moving
|
||||||
|
to a new setpoint. Slower ramp rates reduce thermal shock to DUT.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rate: Ramp rate in degrees Celsius per minute.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If command fails or instrument reports error.
|
||||||
|
ValueError: If rate is negative or exceeds instrument limits.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class IPowerSupply(ABC):
|
||||||
|
"""Hardware abstraction for programmable power supplies.
|
||||||
|
|
||||||
|
Defines the interface for controlling DC power supplies during electrical
|
||||||
|
characterisation tests. Implementations may be virtual instruments
|
||||||
|
(simulators) or real hardware drivers.
|
||||||
|
|
||||||
|
Voltage units are always volts (V).
|
||||||
|
Current units are always amps (A).
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def set_voltage(self, channel: int, voltage: float) -> None:
|
||||||
|
"""Set the output voltage setpoint.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Output channel number (1-based indexing).
|
||||||
|
voltage: Target voltage in volts.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If command fails or instrument reports error.
|
||||||
|
ValueError: If channel is invalid or voltage out of range.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_voltage(self, channel: int) -> float:
|
||||||
|
"""Get the voltage setpoint.
|
||||||
|
|
||||||
|
Returns the programmed voltage, not the measured output voltage.
|
||||||
|
Use measure_voltage() to get the actual output voltage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Output channel number (1-based indexing).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current voltage setpoint in volts.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
ValueError: If channel is invalid.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def set_current_limit(self, channel: int, current: float) -> None:
|
||||||
|
"""Set the current limit.
|
||||||
|
|
||||||
|
The supply will operate in constant voltage mode until output current
|
||||||
|
reaches this limit, then transition to constant current mode.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Output channel number (1-based indexing).
|
||||||
|
current: Current limit in amps.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If command fails or instrument reports error.
|
||||||
|
ValueError: If channel is invalid or current out of range.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_current_limit(self, channel: int) -> float:
|
||||||
|
"""Get the current limit.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Output channel number (1-based indexing).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Current limit in amps.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
ValueError: If channel is invalid.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def measure_voltage(self, channel: int) -> float:
|
||||||
|
"""Measure the actual output voltage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Output channel number (1-based indexing).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured output voltage in volts.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
ValueError: If channel is invalid.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def measure_current(self, channel: int) -> float:
|
||||||
|
"""Measure the actual output current.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Output channel number (1-based indexing).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured output current in amps.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
ValueError: If channel is invalid.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def enable_output(self, channel: int, enable: bool) -> None:
|
||||||
|
"""Enable or disable the output.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Output channel number (1-based indexing).
|
||||||
|
enable: True to enable output, False to disable.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If command fails or instrument reports error.
|
||||||
|
ValueError: If channel is invalid.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def is_output_enabled(self, channel: int) -> bool:
|
||||||
|
"""Check if output is enabled.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: Output channel number (1-based indexing).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if output is enabled, False if disabled.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
ValueError: If channel is invalid.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class IMultimeter(ABC):
|
||||||
|
"""Hardware abstraction for digital multimeters.
|
||||||
|
|
||||||
|
Defines the interface for making precision measurements with DMMs during
|
||||||
|
electrical characterisation tests. Implementations may be virtual instruments
|
||||||
|
(simulators) or real hardware drivers.
|
||||||
|
|
||||||
|
Voltage units are always volts (V).
|
||||||
|
Current units are always amps (A).
|
||||||
|
Resistance units are always ohms (Ω).
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def measure_dc_voltage(self, range: str = "AUTO") -> float:
|
||||||
|
"""Measure DC voltage.
|
||||||
|
|
||||||
|
Configures the meter for DC voltage and takes a measurement.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
range: Measurement range. Default "AUTO" for auto-ranging.
|
||||||
|
Specific ranges depend on instrument capabilities.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured voltage in volts.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
ValueError: If range is invalid for this instrument.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def measure_dc_current(self, range: str = "AUTO") -> float:
|
||||||
|
"""Measure DC current.
|
||||||
|
|
||||||
|
Configures the meter for DC current and takes a measurement.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
range: Measurement range. Default "AUTO" for auto-ranging.
|
||||||
|
Specific ranges depend on instrument capabilities.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured current in amps.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
ValueError: If range is invalid for this instrument.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def measure_resistance(self, range: str = "AUTO") -> float:
|
||||||
|
"""Measure resistance.
|
||||||
|
|
||||||
|
Configures the meter for resistance and takes a measurement.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
range: Measurement range. Default "AUTO" for auto-ranging.
|
||||||
|
Specific ranges depend on instrument capabilities.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Measured resistance in ohms.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If query fails or instrument reports error.
|
||||||
|
ValueError: If range is invalid for this instrument.
|
||||||
|
NotImplementedError: If instrument does not support resistance.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def set_integration_time(self, nplc: float) -> None:
|
||||||
|
"""Set the integration time.
|
||||||
|
|
||||||
|
Integration time affects measurement accuracy and speed. Higher
|
||||||
|
values (more power line cycles) provide better noise rejection
|
||||||
|
but take longer to measure.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
nplc: Integration time in number of power line cycles (NPLC).
|
||||||
|
Typical values: 0.02, 0.2, 1, 10, 100.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected to instrument.
|
||||||
|
IOError: If command fails or instrument reports error.
|
||||||
|
ValueError: If nplc is invalid for this instrument.
|
||||||
|
NotImplementedError: If instrument does not support integration time.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
@@ -1,6 +1,13 @@
|
|||||||
"""Transport layer for instrument communication.
|
"""Transport layer for instrument communication.
|
||||||
|
|
||||||
Provides connection abstractions for different backends:
|
Provides connection abstractions for different backends:
|
||||||
|
- TCP server for hosting SCPI instruments
|
||||||
- TCP sockets (for simulation server)
|
- TCP sockets (for simulation server)
|
||||||
- PyVISA (for real instruments)
|
- PyVISA (for real instruments)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.transport.base import Transport
|
||||||
|
from py_dvt_ate.instruments.transport.server import InstrumentServer, SCPIDevice
|
||||||
|
from py_dvt_ate.instruments.transport.tcp import TCPTransport
|
||||||
|
|
||||||
|
__all__ = ["Transport", "TCPTransport", "InstrumentServer", "SCPIDevice"]
|
||||||
|
|||||||
93
src/py_dvt_ate/instruments/transport/base.py
Normal file
93
src/py_dvt_ate/instruments/transport/base.py
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
"""Base transport interface for instrument communication."""
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
|
class Transport(ABC):
|
||||||
|
"""Abstract transport interface for instrument communication.
|
||||||
|
|
||||||
|
This abstract base class defines the interface that all transport
|
||||||
|
implementations (TCP, VISA, etc.) must implement. It provides basic
|
||||||
|
connection management and communication primitives for SCPI-based
|
||||||
|
instruments.
|
||||||
|
|
||||||
|
Implementations must inherit from this class and implement all abstract
|
||||||
|
methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def connect(self) -> None:
|
||||||
|
"""Establish connection to instrument.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If connection fails.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def disconnect(self) -> None:
|
||||||
|
"""Close connection to instrument.
|
||||||
|
|
||||||
|
Should be idempotent - safe to call multiple times.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def write(self, command: str) -> None:
|
||||||
|
"""Send command to instrument.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI command string to send (without terminator).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If write fails.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def read(self, timeout: float | None = None) -> str:
|
||||||
|
"""Read response from instrument.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: Read timeout in seconds. None uses default.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response string from instrument (without terminator).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If read fails.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def query(self, command: str, timeout: float | None = None) -> str:
|
||||||
|
"""Send command and read response.
|
||||||
|
|
||||||
|
Convenience method combining write() and read().
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI command string to send.
|
||||||
|
timeout: Read timeout in seconds. None uses default.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response string from instrument.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@property
|
||||||
|
@abstractmethod
|
||||||
|
def is_connected(self) -> bool:
|
||||||
|
"""Check if connection is active.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if connected, False otherwise.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
@@ -1,32 +1,56 @@
|
|||||||
"""Async TCP server for exposing virtual instruments over network.
|
"""Async TCP server for exposing instruments over network.
|
||||||
|
|
||||||
This module provides the InstrumentServer class that hosts virtual SCPI
|
This module provides the InstrumentServer class that hosts SCPI
|
||||||
instruments over TCP, allowing client applications to communicate using
|
instruments over TCP, allowing client applications to communicate using
|
||||||
standard SCPI commands over a network connection.
|
standard SCPI commands over a network connection.
|
||||||
|
|
||||||
|
This is a general-purpose server that works with any object implementing
|
||||||
|
the SCPIDevice protocol (having a process(command) -> str method).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING
|
from functools import partial
|
||||||
|
from typing import Protocol, runtime_checkable
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
__all__ = ["InstrumentServer", "SCPIDevice"]
|
||||||
from py_dvt_ate.simulation.virtual.base import BaseInstrument
|
|
||||||
|
|
||||||
# Re-export for type checking - actual import happens at runtime via registration
|
|
||||||
__all__ = ["InstrumentServer"]
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class SCPIDevice(Protocol):
|
||||||
|
"""Protocol for SCPI-compatible devices.
|
||||||
|
|
||||||
|
Any object with a process method matching this signature can be
|
||||||
|
served by InstrumentServer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def process(self, command: str) -> str:
|
||||||
|
"""Process a SCPI command and return the response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI command string to process.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response string (may be empty for commands with no response).
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
class InstrumentServer:
|
class InstrumentServer:
|
||||||
"""Async TCP server hosting virtual SCPI instruments.
|
"""Async TCP server hosting SCPI instruments.
|
||||||
|
|
||||||
Each instrument is assigned a port. Clients connect via TCP and send
|
Each instrument is assigned a port. Clients connect via TCP and send
|
||||||
SCPI commands as newline-terminated strings. Responses are also
|
SCPI commands as newline-terminated strings. Responses are also
|
||||||
newline-terminated.
|
newline-terminated.
|
||||||
|
|
||||||
|
This server can host any device implementing the SCPIDevice protocol,
|
||||||
|
including both virtual instruments (simulators) and adapters for
|
||||||
|
real hardware.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
host: Host address to bind to.
|
host: Host address to bind to.
|
||||||
"""
|
"""
|
||||||
@@ -38,7 +62,7 @@ class InstrumentServer:
|
|||||||
host: Host address to bind to. Defaults to localhost.
|
host: Host address to bind to. Defaults to localhost.
|
||||||
"""
|
"""
|
||||||
self._host = host
|
self._host = host
|
||||||
self._instruments: dict[int, BaseInstrument] = {}
|
self._instruments: dict[int, SCPIDevice] = {}
|
||||||
self._servers: list[asyncio.Server] = []
|
self._servers: list[asyncio.Server] = []
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
@@ -52,12 +76,12 @@ class InstrumentServer:
|
|||||||
"""Check if server is currently running."""
|
"""Check if server is currently running."""
|
||||||
return self._running
|
return self._running
|
||||||
|
|
||||||
def register_instrument(self, port: int, instrument: BaseInstrument) -> None:
|
def register_instrument(self, port: int, instrument: SCPIDevice) -> None:
|
||||||
"""Register an instrument to be served on a specific port.
|
"""Register an instrument to be served on a specific port.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
port: TCP port number to serve the instrument on.
|
port: TCP port number to serve the instrument on.
|
||||||
instrument: Virtual instrument to serve.
|
instrument: SCPI device to serve (any object with process method).
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ValueError: If port is already registered.
|
ValueError: If port is already registered.
|
||||||
@@ -76,7 +100,7 @@ class InstrumentServer:
|
|||||||
port,
|
port,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_instrument(self, port: int) -> BaseInstrument | None:
|
def get_instrument(self, port: int) -> SCPIDevice | None:
|
||||||
"""Get the instrument registered on a port.
|
"""Get the instrument registered on a port.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -109,8 +133,9 @@ class InstrumentServer:
|
|||||||
self._running = True
|
self._running = True
|
||||||
|
|
||||||
for port, instrument in self._instruments.items():
|
for port, instrument in self._instruments.items():
|
||||||
|
handler = partial(self._handle_client, instrument=instrument, port=port)
|
||||||
server = await asyncio.start_server(
|
server = await asyncio.start_server(
|
||||||
lambda r, w, inst=instrument, p=port: self._handle_client(r, w, inst, p),
|
handler,
|
||||||
self._host,
|
self._host,
|
||||||
port,
|
port,
|
||||||
)
|
)
|
||||||
@@ -154,7 +179,7 @@ class InstrumentServer:
|
|||||||
self,
|
self,
|
||||||
reader: asyncio.StreamReader,
|
reader: asyncio.StreamReader,
|
||||||
writer: asyncio.StreamWriter,
|
writer: asyncio.StreamWriter,
|
||||||
instrument: BaseInstrument,
|
instrument: SCPIDevice,
|
||||||
port: int,
|
port: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Handle a client connection.
|
"""Handle a client connection.
|
||||||
@@ -193,7 +218,7 @@ class InstrumentServer:
|
|||||||
|
|
||||||
# Send response with newline terminator
|
# Send response with newline terminator
|
||||||
if response:
|
if response:
|
||||||
writer.write(f"{response}\n".encode("utf-8"))
|
writer.write(f"{response}\n".encode())
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
logger.debug("Port %d sent: %s", port, response)
|
logger.debug("Port %d sent: %s", port, response)
|
||||||
|
|
||||||
195
src/py_dvt_ate/instruments/transport/tcp.py
Normal file
195
src/py_dvt_ate/instruments/transport/tcp.py
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
"""TCP socket transport for instrument communication."""
|
||||||
|
|
||||||
|
import socket
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.transport.base import Transport
|
||||||
|
|
||||||
|
|
||||||
|
class TCPTransport(Transport):
|
||||||
|
"""TCP socket transport implementation.
|
||||||
|
|
||||||
|
Implements the Transport interface for communicating with SCPI
|
||||||
|
instruments over TCP/IP using newline-terminated messages.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
host: Hostname or IP address of the instrument.
|
||||||
|
port: TCP port number.
|
||||||
|
timeout: Default socket timeout in seconds.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str,
|
||||||
|
port: int,
|
||||||
|
timeout: float = 5.0,
|
||||||
|
encoding: str = "utf-8",
|
||||||
|
) -> None:
|
||||||
|
"""Initialise TCP transport.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host: Hostname or IP address.
|
||||||
|
port: TCP port number.
|
||||||
|
timeout: Default socket timeout in seconds.
|
||||||
|
encoding: Text encoding for commands and responses.
|
||||||
|
"""
|
||||||
|
self._host = host
|
||||||
|
self._port = port
|
||||||
|
self._timeout = timeout
|
||||||
|
self._encoding = encoding
|
||||||
|
self._socket: socket.socket | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def host(self) -> str:
|
||||||
|
"""Get the host address."""
|
||||||
|
return self._host
|
||||||
|
|
||||||
|
@property
|
||||||
|
def port(self) -> int:
|
||||||
|
"""Get the port number."""
|
||||||
|
return self._port
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_connected(self) -> bool:
|
||||||
|
"""Check if connection is active.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if connected, False otherwise.
|
||||||
|
"""
|
||||||
|
return self._socket is not None
|
||||||
|
|
||||||
|
def connect(self) -> None:
|
||||||
|
"""Establish connection to instrument.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If connection fails or already connected.
|
||||||
|
"""
|
||||||
|
if self.is_connected:
|
||||||
|
raise ConnectionError("Already connected")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
self._socket.settimeout(self._timeout)
|
||||||
|
self._socket.connect((self._host, self._port))
|
||||||
|
except OSError as err:
|
||||||
|
self._socket = None
|
||||||
|
raise ConnectionError(
|
||||||
|
f"Failed to connect to {self._host}:{self._port}: {err}"
|
||||||
|
) from err
|
||||||
|
|
||||||
|
def disconnect(self) -> None:
|
||||||
|
"""Close connection to instrument.
|
||||||
|
|
||||||
|
Safe to call multiple times (idempotent).
|
||||||
|
"""
|
||||||
|
if self._socket is not None:
|
||||||
|
try:
|
||||||
|
self._socket.close()
|
||||||
|
except OSError:
|
||||||
|
pass # Ignore errors during close
|
||||||
|
finally:
|
||||||
|
self._socket = None
|
||||||
|
|
||||||
|
def write(self, command: str) -> None:
|
||||||
|
"""Send command to instrument.
|
||||||
|
|
||||||
|
Commands are sent with newline terminator appended.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI command string to send (without terminator).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
IOError: If write fails.
|
||||||
|
"""
|
||||||
|
if not self.is_connected or self._socket is None:
|
||||||
|
raise ConnectionError("Not connected")
|
||||||
|
|
||||||
|
try:
|
||||||
|
message = f"{command}\n".encode(self._encoding)
|
||||||
|
self._socket.sendall(message)
|
||||||
|
except OSError as err:
|
||||||
|
raise OSError(f"Write failed: {err}") from err
|
||||||
|
|
||||||
|
def read(self, timeout: float | None = None) -> str:
|
||||||
|
"""Read response from instrument.
|
||||||
|
|
||||||
|
Reads until newline terminator is received.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: Read timeout in seconds. None uses default.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response string from instrument (without terminator).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If read fails.
|
||||||
|
"""
|
||||||
|
if not self.is_connected or self._socket is None:
|
||||||
|
raise ConnectionError("Not connected")
|
||||||
|
|
||||||
|
# Set timeout if specified
|
||||||
|
old_timeout = self._socket.gettimeout()
|
||||||
|
if timeout is not None:
|
||||||
|
self._socket.settimeout(timeout)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read line by line (newline-terminated protocol)
|
||||||
|
response_bytes = b""
|
||||||
|
while True:
|
||||||
|
chunk = self._socket.recv(1)
|
||||||
|
if not chunk:
|
||||||
|
raise ConnectionError("Connection closed by remote host")
|
||||||
|
response_bytes += chunk
|
||||||
|
if chunk == b"\n":
|
||||||
|
break
|
||||||
|
|
||||||
|
# Decode and strip whitespace
|
||||||
|
return response_bytes.decode(self._encoding).strip()
|
||||||
|
|
||||||
|
except ConnectionError:
|
||||||
|
raise # Re-raise ConnectionError as-is
|
||||||
|
except TimeoutError as err:
|
||||||
|
raise TimeoutError("Read timeout") from err
|
||||||
|
except (OSError, UnicodeDecodeError) as err:
|
||||||
|
raise OSError(f"Read failed: {err}") from err
|
||||||
|
finally:
|
||||||
|
# Restore original timeout
|
||||||
|
if timeout is not None:
|
||||||
|
self._socket.settimeout(old_timeout)
|
||||||
|
|
||||||
|
def query(self, command: str, timeout: float | None = None) -> str:
|
||||||
|
"""Send command and read response.
|
||||||
|
|
||||||
|
Convenience method combining write() and read().
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: SCPI command string to send.
|
||||||
|
timeout: Read timeout in seconds. None uses default.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response string from instrument.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConnectionError: If not connected.
|
||||||
|
TimeoutError: If read times out.
|
||||||
|
IOError: If communication fails.
|
||||||
|
"""
|
||||||
|
self.write(command)
|
||||||
|
return self.read(timeout)
|
||||||
|
|
||||||
|
def __enter__(self) -> "TCPTransport":
|
||||||
|
"""Context manager entry."""
|
||||||
|
self.connect()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *args: Any) -> None:
|
||||||
|
"""Context manager exit."""
|
||||||
|
self.disconnect()
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
"""String representation."""
|
||||||
|
status = "connected" if self.is_connected else "disconnected"
|
||||||
|
return f"TCPTransport({self._host}:{self._port}, {status})"
|
||||||
@@ -2,9 +2,10 @@
|
|||||||
|
|
||||||
Provides virtual instruments backed by a coupled thermal-electrical
|
Provides virtual instruments backed by a coupled thermal-electrical
|
||||||
physics engine. Used for development and testing without real hardware.
|
physics engine. Used for development and testing without real hardware.
|
||||||
|
|
||||||
|
Note: InstrumentServer has moved to py_dvt_ate.instruments.transport
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
||||||
from py_dvt_ate.simulation.tcp_server import InstrumentServer
|
|
||||||
|
|
||||||
__all__ = ["InstrumentServer", "ServerConfig", "SimulationServer"]
|
__all__ = ["ServerConfig", "SimulationServer"]
|
||||||
|
|||||||
@@ -1,15 +1,14 @@
|
|||||||
"""Base protocol for Device Under Test (DUT) models.
|
"""Base interface for Device Under Test (DUT) models.
|
||||||
|
|
||||||
Defines the interface that all DUT models must implement to integrate
|
Defines the interface that all DUT models must implement to integrate
|
||||||
with the physics engine.
|
with the physics engine.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import Protocol, runtime_checkable
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
class DUTModel(ABC):
|
||||||
class DUTModel(Protocol):
|
"""Abstract base class for DUT electrical/thermal models.
|
||||||
"""Protocol for DUT electrical/thermal models.
|
|
||||||
|
|
||||||
DUT models encapsulate the temperature-dependent electrical behaviour
|
DUT models encapsulate the temperature-dependent electrical behaviour
|
||||||
of a device, enabling realistic simulation of thermal-electrical coupling.
|
of a device, enabling realistic simulation of thermal-electrical coupling.
|
||||||
@@ -18,8 +17,12 @@ class DUTModel(Protocol):
|
|||||||
All voltage parameters are in volts.
|
All voltage parameters are in volts.
|
||||||
All current parameters are in amps.
|
All current parameters are in amps.
|
||||||
All power parameters are in watts.
|
All power parameters are in watts.
|
||||||
|
|
||||||
|
Implementations must inherit from this class and implement all abstract
|
||||||
|
methods.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def calculate_output_voltage(self, junction_temperature: float) -> float:
|
def calculate_output_voltage(self, junction_temperature: float) -> float:
|
||||||
"""Calculate the output voltage at the given junction temperature.
|
"""Calculate the output voltage at the given junction temperature.
|
||||||
|
|
||||||
@@ -29,8 +32,9 @@ class DUTModel(Protocol):
|
|||||||
Returns:
|
Returns:
|
||||||
Output voltage in volts.
|
Output voltage in volts.
|
||||||
"""
|
"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def calculate_quiescent_current(self, junction_temperature: float) -> float:
|
def calculate_quiescent_current(self, junction_temperature: float) -> float:
|
||||||
"""Calculate the quiescent current at the given junction temperature.
|
"""Calculate the quiescent current at the given junction temperature.
|
||||||
|
|
||||||
@@ -40,8 +44,9 @@ class DUTModel(Protocol):
|
|||||||
Returns:
|
Returns:
|
||||||
Quiescent current in amps.
|
Quiescent current in amps.
|
||||||
"""
|
"""
|
||||||
...
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
def calculate_power_dissipation(
|
def calculate_power_dissipation(
|
||||||
self,
|
self,
|
||||||
input_voltage: float,
|
input_voltage: float,
|
||||||
@@ -58,4 +63,4 @@ class DUTModel(Protocol):
|
|||||||
Returns:
|
Returns:
|
||||||
Power dissipation in watts.
|
Power dissipation in watts.
|
||||||
"""
|
"""
|
||||||
...
|
pass
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ and power dissipation calculations.
|
|||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from py_dvt_ate.simulation.physics.models.base import DUTModel
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class LDOParameters:
|
class LDOParameters:
|
||||||
@@ -35,7 +37,7 @@ class LDOParameters:
|
|||||||
REFERENCE_TEMPERATURE_C = 25.0
|
REFERENCE_TEMPERATURE_C = 25.0
|
||||||
|
|
||||||
|
|
||||||
class LDOModel:
|
class LDOModel(DUTModel):
|
||||||
"""Temperature-dependent LDO voltage regulator model.
|
"""Temperature-dependent LDO voltage regulator model.
|
||||||
|
|
||||||
Models the electrical behaviour of a linear voltage regulator with:
|
Models the electrical behaviour of a linear voltage regulator with:
|
||||||
@@ -44,7 +46,7 @@ class LDOModel:
|
|||||||
- Dropout voltage that increases with temperature
|
- Dropout voltage that increases with temperature
|
||||||
- Power dissipation from (Vin - Vout) × Iload + Vin × Iq
|
- Power dissipation from (Vin - Vout) × Iload + Vin × Iq
|
||||||
|
|
||||||
This class implements the DUTModel protocol.
|
This class implements the DUTModel interface.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -194,7 +196,7 @@ class LDOModel:
|
|||||||
# Temperature ratio (reference is approximately 300K ≈ 27°C)
|
# Temperature ratio (reference is approximately 300K ≈ 27°C)
|
||||||
temp_ratio = t_kelvin / 300.0
|
temp_ratio = t_kelvin / 300.0
|
||||||
|
|
||||||
return self._params.dropout_voltage * (temp_ratio**1.5)
|
return float(self._params.dropout_voltage * (temp_ratio**1.5))
|
||||||
|
|
||||||
def is_in_dropout(self, junction_temperature: float) -> bool:
|
def is_in_dropout(self, junction_temperature: float) -> bool:
|
||||||
"""Check if the LDO is in dropout at current operating point.
|
"""Check if the LDO is in dropout at current operating point.
|
||||||
|
|||||||
@@ -11,8 +11,8 @@ import logging
|
|||||||
import signal
|
import signal
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.transport import InstrumentServer
|
||||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||||
from py_dvt_ate.simulation.tcp_server import InstrumentServer
|
|
||||||
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
||||||
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
|
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
|
||||||
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
|
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
|
||||||
|
|||||||
@@ -82,8 +82,8 @@ class ThermalChamberSim(BaseInstrument):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
setpoint = float(command.arguments[0])
|
setpoint = float(command.arguments[0])
|
||||||
except ValueError:
|
except ValueError as err:
|
||||||
raise ValueError(f"Invalid temperature value: {command.arguments[0]}")
|
raise ValueError(f"Invalid temperature value: {command.arguments[0]}") from err
|
||||||
|
|
||||||
self._setpoint = setpoint
|
self._setpoint = setpoint
|
||||||
if self._physics_engine is not None:
|
if self._physics_engine is not None:
|
||||||
|
|||||||
@@ -94,8 +94,8 @@ class PowerSupplySim(BaseInstrument):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
voltage = float(command.arguments[0])
|
voltage = float(command.arguments[0])
|
||||||
except ValueError:
|
except ValueError as err:
|
||||||
raise ValueError(f"Invalid voltage value: {command.arguments[0]}")
|
raise ValueError(f"Invalid voltage value: {command.arguments[0]}") from err
|
||||||
|
|
||||||
if voltage < 0:
|
if voltage < 0:
|
||||||
raise ValueError("Voltage cannot be negative")
|
raise ValueError("Voltage cannot be negative")
|
||||||
@@ -127,8 +127,8 @@ class PowerSupplySim(BaseInstrument):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
current = float(command.arguments[0])
|
current = float(command.arguments[0])
|
||||||
except ValueError:
|
except ValueError as err:
|
||||||
raise ValueError(f"Invalid current value: {command.arguments[0]}")
|
raise ValueError(f"Invalid current value: {command.arguments[0]}") from err
|
||||||
|
|
||||||
if current < 0:
|
if current < 0:
|
||||||
raise ValueError("Current limit cannot be negative")
|
raise ValueError("Current limit cannot be negative")
|
||||||
|
|||||||
@@ -9,12 +9,10 @@ import asyncio
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.transport import InstrumentServer
|
||||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||||
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
||||||
from py_dvt_ate.simulation.tcp_server import InstrumentServer
|
|
||||||
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
||||||
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
|
|
||||||
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio(loop_scope="function")
|
@pytest.mark.asyncio(loop_scope="function")
|
||||||
|
|||||||
346
tests/unit/test_drivers.py
Normal file
346
tests/unit/test_drivers.py
Normal file
@@ -0,0 +1,346 @@
|
|||||||
|
"""Unit tests for instrument drivers.
|
||||||
|
|
||||||
|
Tests SCPI command formatting and driver functionality using mock transports.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.drivers.base import BaseDriver
|
||||||
|
from py_dvt_ate.instruments.drivers.chamber import ThermalChamberDriver
|
||||||
|
from py_dvt_ate.instruments.drivers.multimeter import MultimeterDriver
|
||||||
|
from py_dvt_ate.instruments.drivers.power_supply import PowerSupplyDriver
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_transport():
|
||||||
|
"""Create a mock transport for testing."""
|
||||||
|
transport = MagicMock()
|
||||||
|
transport.is_connected = True
|
||||||
|
return transport
|
||||||
|
|
||||||
|
|
||||||
|
class TestBaseDriver:
|
||||||
|
"""Tests for BaseDriver base class."""
|
||||||
|
|
||||||
|
def test_connect(self, mock_transport):
|
||||||
|
"""Test connection establishment."""
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
driver.connect()
|
||||||
|
mock_transport.connect.assert_called_once()
|
||||||
|
|
||||||
|
def test_disconnect(self, mock_transport):
|
||||||
|
"""Test disconnection."""
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
driver.disconnect()
|
||||||
|
mock_transport.disconnect.assert_called_once()
|
||||||
|
|
||||||
|
def test_is_connected(self, mock_transport):
|
||||||
|
"""Test connection status check."""
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
assert driver.is_connected is True
|
||||||
|
|
||||||
|
def test_write(self, mock_transport):
|
||||||
|
"""Test SCPI command write."""
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
driver.write("VOLT 3.3")
|
||||||
|
mock_transport.write.assert_called_once_with("VOLT 3.3")
|
||||||
|
|
||||||
|
def test_query(self, mock_transport):
|
||||||
|
"""Test SCPI query."""
|
||||||
|
mock_transport.query.return_value = "3.300"
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
result = driver.query("VOLT?")
|
||||||
|
assert result == "3.300"
|
||||||
|
mock_transport.query.assert_called_once_with("VOLT?", None)
|
||||||
|
|
||||||
|
def test_query_float(self, mock_transport):
|
||||||
|
"""Test SCPI query with float parsing."""
|
||||||
|
mock_transport.query.return_value = "3.300"
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
result = driver.query_float("VOLT?")
|
||||||
|
assert result == 3.3
|
||||||
|
assert isinstance(result, float)
|
||||||
|
|
||||||
|
def test_query_float_invalid(self, mock_transport):
|
||||||
|
"""Test SCPI query with invalid float response."""
|
||||||
|
mock_transport.query.return_value = "INVALID"
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
with pytest.raises(ValueError, match="Cannot parse 'INVALID' as float"):
|
||||||
|
driver.query_float("VOLT?")
|
||||||
|
|
||||||
|
def test_query_int(self, mock_transport):
|
||||||
|
"""Test SCPI query with integer parsing."""
|
||||||
|
mock_transport.query.return_value = "42"
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
result = driver.query_int("COUNT?")
|
||||||
|
assert result == 42
|
||||||
|
assert isinstance(result, int)
|
||||||
|
|
||||||
|
def test_query_int_invalid(self, mock_transport):
|
||||||
|
"""Test SCPI query with invalid integer response."""
|
||||||
|
mock_transport.query.return_value = "3.14"
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
with pytest.raises(ValueError, match="Cannot parse '3.14' as int"):
|
||||||
|
driver.query_int("COUNT?")
|
||||||
|
|
||||||
|
def test_query_bool_true_variants(self, mock_transport):
|
||||||
|
"""Test SCPI query with boolean parsing - true variants."""
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
|
||||||
|
for value in ["1", "ON", "TRUE", "on", "true"]:
|
||||||
|
mock_transport.query.return_value = value
|
||||||
|
result = driver.query_bool("OUTP?")
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
def test_query_bool_false_variants(self, mock_transport):
|
||||||
|
"""Test SCPI query with boolean parsing - false variants."""
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
|
||||||
|
for value in ["0", "OFF", "FALSE", "off", "false"]:
|
||||||
|
mock_transport.query.return_value = value
|
||||||
|
result = driver.query_bool("OUTP?")
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_query_bool_invalid(self, mock_transport):
|
||||||
|
"""Test SCPI query with invalid boolean response."""
|
||||||
|
mock_transport.query.return_value = "MAYBE"
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
with pytest.raises(ValueError, match="Cannot parse 'MAYBE' as bool"):
|
||||||
|
driver.query_bool("OUTP?")
|
||||||
|
|
||||||
|
def test_identify(self, mock_transport):
|
||||||
|
"""Test instrument identification query."""
|
||||||
|
mock_transport.query.return_value = "Manufacturer,Model,SN123,1.0.0"
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
result = driver.identify()
|
||||||
|
assert result == "Manufacturer,Model,SN123,1.0.0"
|
||||||
|
mock_transport.query.assert_called_once_with("*IDN?", None)
|
||||||
|
|
||||||
|
def test_reset(self, mock_transport):
|
||||||
|
"""Test instrument reset command."""
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
driver.reset()
|
||||||
|
mock_transport.write.assert_called_once_with("*RST")
|
||||||
|
|
||||||
|
def test_clear_status(self, mock_transport):
|
||||||
|
"""Test clear status command."""
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
driver.clear_status()
|
||||||
|
mock_transport.write.assert_called_once_with("*CLS")
|
||||||
|
|
||||||
|
def test_operation_complete(self, mock_transport):
|
||||||
|
"""Test operation complete query."""
|
||||||
|
mock_transport.query.return_value = "1"
|
||||||
|
driver = BaseDriver(mock_transport)
|
||||||
|
result = driver.operation_complete()
|
||||||
|
assert result is True
|
||||||
|
mock_transport.query.assert_called_once_with("*OPC?", None)
|
||||||
|
|
||||||
|
|
||||||
|
class TestThermalChamberDriver:
|
||||||
|
"""Tests for ThermalChamberDriver."""
|
||||||
|
|
||||||
|
def test_set_temperature(self, mock_transport):
|
||||||
|
"""Test temperature setpoint command."""
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
driver.set_temperature(85.0)
|
||||||
|
mock_transport.write.assert_called_once_with("TEMP:SETPOINT 85.00")
|
||||||
|
|
||||||
|
def test_get_temperature(self, mock_transport):
|
||||||
|
"""Test temperature measurement query."""
|
||||||
|
mock_transport.query.return_value = "25.50"
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
temp = driver.get_temperature()
|
||||||
|
assert temp == 25.5
|
||||||
|
mock_transport.query.assert_called_once_with("TEMP:ACTUAL?", None)
|
||||||
|
|
||||||
|
def test_get_setpoint(self, mock_transport):
|
||||||
|
"""Test setpoint query."""
|
||||||
|
mock_transport.query.return_value = "85.00"
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
setpoint = driver.get_setpoint()
|
||||||
|
assert setpoint == 85.0
|
||||||
|
mock_transport.query.assert_called_once_with("TEMP:SETPOINT?", None)
|
||||||
|
|
||||||
|
def test_is_stable_true(self, mock_transport):
|
||||||
|
"""Test stability check - stable."""
|
||||||
|
mock_transport.query.return_value = "1"
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
assert driver.is_stable() is True
|
||||||
|
|
||||||
|
def test_is_stable_false(self, mock_transport):
|
||||||
|
"""Test stability check - not stable."""
|
||||||
|
mock_transport.query.return_value = "0"
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
assert driver.is_stable() is False
|
||||||
|
|
||||||
|
def test_wait_until_stable_immediate(self, mock_transport):
|
||||||
|
"""Test wait for stability - already stable."""
|
||||||
|
mock_transport.query.return_value = "1"
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
result = driver.wait_until_stable(timeout=5.0, poll_interval=0.1)
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
def test_wait_until_stable_timeout(self, mock_transport):
|
||||||
|
"""Test wait for stability - timeout."""
|
||||||
|
mock_transport.query.return_value = "0" # Never becomes stable
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
result = driver.wait_until_stable(timeout=0.2, poll_interval=0.1)
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_wait_until_stable_invalid_timeout(self, mock_transport):
|
||||||
|
"""Test wait with negative timeout."""
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
with pytest.raises(ValueError, match="Timeout must be non-negative"):
|
||||||
|
driver.wait_until_stable(timeout=-1.0)
|
||||||
|
|
||||||
|
def test_wait_until_stable_invalid_interval(self, mock_transport):
|
||||||
|
"""Test wait with non-positive poll interval."""
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
with pytest.raises(ValueError, match="Poll interval must be positive"):
|
||||||
|
driver.wait_until_stable(poll_interval=0.0)
|
||||||
|
|
||||||
|
def test_set_ramp_rate(self, mock_transport):
|
||||||
|
"""Test ramp rate command."""
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
driver.set_ramp_rate(5.0)
|
||||||
|
mock_transport.write.assert_called_once_with("TEMP:RAMP 5.00")
|
||||||
|
|
||||||
|
def test_get_ramp_rate(self, mock_transport):
|
||||||
|
"""Test ramp rate query."""
|
||||||
|
mock_transport.query.return_value = "5.00"
|
||||||
|
driver = ThermalChamberDriver(mock_transport)
|
||||||
|
rate = driver.get_ramp_rate()
|
||||||
|
assert rate == 5.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestPowerSupplyDriver:
|
||||||
|
"""Tests for PowerSupplyDriver."""
|
||||||
|
|
||||||
|
def test_set_voltage(self, mock_transport):
|
||||||
|
"""Test voltage setpoint command."""
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
driver.set_voltage(1, 3.3)
|
||||||
|
mock_transport.write.assert_called_once_with("VOLT 3.300")
|
||||||
|
|
||||||
|
def test_get_voltage(self, mock_transport):
|
||||||
|
"""Test voltage setpoint query."""
|
||||||
|
mock_transport.query.return_value = "3.300"
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
voltage = driver.get_voltage(1)
|
||||||
|
assert voltage == 3.3
|
||||||
|
|
||||||
|
def test_set_current_limit(self, mock_transport):
|
||||||
|
"""Test current limit command."""
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
driver.set_current_limit(1, 0.5)
|
||||||
|
mock_transport.write.assert_called_once_with("CURR 0.500")
|
||||||
|
|
||||||
|
def test_get_current_limit(self, mock_transport):
|
||||||
|
"""Test current limit query."""
|
||||||
|
mock_transport.query.return_value = "0.500"
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
current = driver.get_current_limit(1)
|
||||||
|
assert current == 0.5
|
||||||
|
|
||||||
|
def test_measure_voltage(self, mock_transport):
|
||||||
|
"""Test voltage measurement."""
|
||||||
|
mock_transport.query.return_value = "3.305"
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
voltage = driver.measure_voltage(1)
|
||||||
|
assert voltage == 3.305
|
||||||
|
mock_transport.query.assert_called_once_with("MEAS:VOLT?", None)
|
||||||
|
|
||||||
|
def test_measure_current(self, mock_transport):
|
||||||
|
"""Test current measurement."""
|
||||||
|
mock_transport.query.return_value = "0.125"
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
current = driver.measure_current(1)
|
||||||
|
assert current == 0.125
|
||||||
|
mock_transport.query.assert_called_once_with("MEAS:CURR?", None)
|
||||||
|
|
||||||
|
def test_enable_output_on(self, mock_transport):
|
||||||
|
"""Test enable output command."""
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
driver.enable_output(1, True)
|
||||||
|
mock_transport.write.assert_called_once_with("OUTP ON")
|
||||||
|
|
||||||
|
def test_enable_output_off(self, mock_transport):
|
||||||
|
"""Test disable output command."""
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
driver.enable_output(1, False)
|
||||||
|
mock_transport.write.assert_called_once_with("OUTP OFF")
|
||||||
|
|
||||||
|
def test_is_output_enabled_true(self, mock_transport):
|
||||||
|
"""Test output enabled query - enabled."""
|
||||||
|
mock_transport.query.return_value = "1"
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
assert driver.is_output_enabled(1) is True
|
||||||
|
|
||||||
|
def test_is_output_enabled_false(self, mock_transport):
|
||||||
|
"""Test output enabled query - disabled."""
|
||||||
|
mock_transport.query.return_value = "0"
|
||||||
|
driver = PowerSupplyDriver(mock_transport)
|
||||||
|
assert driver.is_output_enabled(1) is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestMultimeterDriver:
|
||||||
|
"""Tests for MultimeterDriver."""
|
||||||
|
|
||||||
|
def test_measure_dc_voltage(self, mock_transport):
|
||||||
|
"""Test DC voltage measurement."""
|
||||||
|
mock_transport.query.return_value = "3.300000"
|
||||||
|
driver = MultimeterDriver(mock_transport)
|
||||||
|
voltage = driver.measure_dc_voltage()
|
||||||
|
assert voltage == 3.3
|
||||||
|
mock_transport.query.assert_called_once_with("MEAS:VOLT:DC?", None)
|
||||||
|
|
||||||
|
def test_measure_dc_current(self, mock_transport):
|
||||||
|
"""Test DC current measurement."""
|
||||||
|
mock_transport.query.return_value = "0.125000"
|
||||||
|
driver = MultimeterDriver(mock_transport)
|
||||||
|
current = driver.measure_dc_current()
|
||||||
|
assert current == 0.125
|
||||||
|
mock_transport.query.assert_called_once_with("MEAS:CURR:DC?", None)
|
||||||
|
|
||||||
|
def test_measure_resistance_not_implemented(self, mock_transport):
|
||||||
|
"""Test resistance measurement raises NotImplementedError."""
|
||||||
|
driver = MultimeterDriver(mock_transport)
|
||||||
|
with pytest.raises(NotImplementedError, match="Resistance measurement"):
|
||||||
|
driver.measure_resistance()
|
||||||
|
|
||||||
|
def test_set_integration_time_not_implemented(self, mock_transport):
|
||||||
|
"""Test integration time setting raises NotImplementedError."""
|
||||||
|
driver = MultimeterDriver(mock_transport)
|
||||||
|
with pytest.raises(NotImplementedError, match="Integration time"):
|
||||||
|
driver.set_integration_time(1.0)
|
||||||
|
|
||||||
|
def test_configure_dc_voltage(self, mock_transport):
|
||||||
|
"""Test configure for DC voltage."""
|
||||||
|
driver = MultimeterDriver(mock_transport)
|
||||||
|
driver.configure_dc_voltage()
|
||||||
|
mock_transport.write.assert_called_once_with("CONF:VOLT:DC")
|
||||||
|
|
||||||
|
def test_configure_dc_current(self, mock_transport):
|
||||||
|
"""Test configure for DC current."""
|
||||||
|
driver = MultimeterDriver(mock_transport)
|
||||||
|
driver.configure_dc_current()
|
||||||
|
mock_transport.write.assert_called_once_with("CONF:CURR:DC")
|
||||||
|
|
||||||
|
def test_get_configuration(self, mock_transport):
|
||||||
|
"""Test get current configuration."""
|
||||||
|
mock_transport.query.return_value = '"VOLT:DC"'
|
||||||
|
driver = MultimeterDriver(mock_transport)
|
||||||
|
config = driver.get_configuration()
|
||||||
|
assert config == "VOLT:DC"
|
||||||
|
mock_transport.query.assert_called_once_with("CONF?", None)
|
||||||
|
|
||||||
|
def test_read(self, mock_transport):
|
||||||
|
"""Test read measurement with current configuration."""
|
||||||
|
mock_transport.query.return_value = "3.300000"
|
||||||
|
driver = MultimeterDriver(mock_transport)
|
||||||
|
value = driver.read()
|
||||||
|
assert value == 3.3
|
||||||
|
mock_transport.query.assert_called_once_with("READ?", None)
|
||||||
273
tests/unit/test_instruments.py
Normal file
273
tests/unit/test_instruments.py
Normal file
@@ -0,0 +1,273 @@
|
|||||||
|
"""Unit tests for instrument interfaces and factory.
|
||||||
|
|
||||||
|
Tests the Hardware Abstraction Layer (HAL) interfaces and the factory
|
||||||
|
pattern for creating instrument sets.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments import (
|
||||||
|
IMultimeter,
|
||||||
|
IPowerSupply,
|
||||||
|
IThermalChamber,
|
||||||
|
InstrumentConfig,
|
||||||
|
InstrumentFactory,
|
||||||
|
InstrumentSet,
|
||||||
|
)
|
||||||
|
from py_dvt_ate.instruments.drivers import (
|
||||||
|
MultimeterDriver,
|
||||||
|
PowerSupplyDriver,
|
||||||
|
ThermalChamberDriver,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestInterfaceImplementation:
|
||||||
|
"""Test that drivers correctly implement the interface protocols."""
|
||||||
|
|
||||||
|
def test_thermal_chamber_implements_interface(self):
|
||||||
|
"""Verify ThermalChamberDriver implements IThermalChamber."""
|
||||||
|
# ABC inheritance ensures interface compliance at class definition time
|
||||||
|
assert issubclass(ThermalChamberDriver, IThermalChamber)
|
||||||
|
|
||||||
|
def test_power_supply_implements_interface(self):
|
||||||
|
"""Verify PowerSupplyDriver implements IPowerSupply."""
|
||||||
|
assert issubclass(PowerSupplyDriver, IPowerSupply)
|
||||||
|
|
||||||
|
def test_multimeter_implements_interface(self):
|
||||||
|
"""Verify MultimeterDriver implements IMultimeter."""
|
||||||
|
assert issubclass(MultimeterDriver, IMultimeter)
|
||||||
|
|
||||||
|
def test_thermal_chamber_has_all_methods(self):
|
||||||
|
"""Verify ThermalChamberDriver has all required methods."""
|
||||||
|
required_methods = [
|
||||||
|
"set_temperature",
|
||||||
|
"get_temperature",
|
||||||
|
"get_setpoint",
|
||||||
|
"is_stable",
|
||||||
|
"wait_until_stable",
|
||||||
|
"set_ramp_rate",
|
||||||
|
]
|
||||||
|
for method in required_methods:
|
||||||
|
assert hasattr(ThermalChamberDriver, method)
|
||||||
|
|
||||||
|
def test_power_supply_has_all_methods(self):
|
||||||
|
"""Verify PowerSupplyDriver has all required methods."""
|
||||||
|
required_methods = [
|
||||||
|
"set_voltage",
|
||||||
|
"get_voltage",
|
||||||
|
"set_current_limit",
|
||||||
|
"get_current_limit",
|
||||||
|
"measure_voltage",
|
||||||
|
"measure_current",
|
||||||
|
"enable_output",
|
||||||
|
"is_output_enabled",
|
||||||
|
]
|
||||||
|
for method in required_methods:
|
||||||
|
assert hasattr(PowerSupplyDriver, method)
|
||||||
|
|
||||||
|
def test_multimeter_has_all_methods(self):
|
||||||
|
"""Verify MultimeterDriver has all required methods."""
|
||||||
|
required_methods = [
|
||||||
|
"measure_dc_voltage",
|
||||||
|
"measure_dc_current",
|
||||||
|
"measure_resistance",
|
||||||
|
"set_integration_time",
|
||||||
|
]
|
||||||
|
for method in required_methods:
|
||||||
|
assert hasattr(MultimeterDriver, method)
|
||||||
|
|
||||||
|
|
||||||
|
class TestInstrumentSet:
|
||||||
|
"""Test the InstrumentSet dataclass."""
|
||||||
|
|
||||||
|
def test_instrument_set_creation(self):
|
||||||
|
"""Verify InstrumentSet can be created with mock instruments."""
|
||||||
|
from unittest.mock import Mock
|
||||||
|
|
||||||
|
# Create mock instruments that satisfy the interface
|
||||||
|
mock_chamber = Mock(spec=IThermalChamber)
|
||||||
|
mock_psu = Mock(spec=IPowerSupply)
|
||||||
|
mock_dmm = Mock(spec=IMultimeter)
|
||||||
|
|
||||||
|
instrument_set = InstrumentSet(
|
||||||
|
chamber=mock_chamber, psu=mock_psu, dmm=mock_dmm
|
||||||
|
)
|
||||||
|
|
||||||
|
assert instrument_set.chamber is mock_chamber
|
||||||
|
assert instrument_set.psu is mock_psu
|
||||||
|
assert instrument_set.dmm is mock_dmm
|
||||||
|
|
||||||
|
def test_instrument_set_type_annotations(self):
|
||||||
|
"""Verify InstrumentSet has correct type annotations."""
|
||||||
|
annotations = InstrumentSet.__annotations__
|
||||||
|
|
||||||
|
assert annotations["chamber"] == IThermalChamber
|
||||||
|
assert annotations["psu"] == IPowerSupply
|
||||||
|
assert annotations["dmm"] == IMultimeter
|
||||||
|
|
||||||
|
|
||||||
|
class TestInstrumentConfig:
|
||||||
|
"""Test the InstrumentConfig dataclass."""
|
||||||
|
|
||||||
|
def test_config_defaults_simulator(self):
|
||||||
|
"""Verify default configuration for simulator backend."""
|
||||||
|
config = InstrumentConfig(backend="simulator")
|
||||||
|
|
||||||
|
assert config.backend == "simulator"
|
||||||
|
assert config.simulator_host == "localhost"
|
||||||
|
assert config.chamber_port == 5001
|
||||||
|
assert config.psu_port == 5002
|
||||||
|
assert config.dmm_port == 5003
|
||||||
|
assert config.chamber_visa is None
|
||||||
|
assert config.psu_visa is None
|
||||||
|
assert config.dmm_visa is None
|
||||||
|
|
||||||
|
def test_config_custom_ports(self):
|
||||||
|
"""Verify configuration accepts custom port settings."""
|
||||||
|
config = InstrumentConfig(
|
||||||
|
backend="simulator",
|
||||||
|
simulator_host="192.168.1.100",
|
||||||
|
chamber_port=6001,
|
||||||
|
psu_port=6002,
|
||||||
|
dmm_port=6003,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert config.simulator_host == "192.168.1.100"
|
||||||
|
assert config.chamber_port == 6001
|
||||||
|
assert config.psu_port == 6002
|
||||||
|
assert config.dmm_port == 6003
|
||||||
|
|
||||||
|
def test_config_pyvisa_backend(self):
|
||||||
|
"""Verify configuration for PyVISA backend."""
|
||||||
|
config = InstrumentConfig(
|
||||||
|
backend="pyvisa",
|
||||||
|
chamber_visa="TCPIP::192.168.1.10::INSTR",
|
||||||
|
psu_visa="TCPIP::192.168.1.11::INSTR",
|
||||||
|
dmm_visa="TCPIP::192.168.1.12::INSTR",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert config.backend == "pyvisa"
|
||||||
|
assert config.chamber_visa == "TCPIP::192.168.1.10::INSTR"
|
||||||
|
assert config.psu_visa == "TCPIP::192.168.1.11::INSTR"
|
||||||
|
assert config.dmm_visa == "TCPIP::192.168.1.12::INSTR"
|
||||||
|
|
||||||
|
|
||||||
|
class TestInstrumentFactory:
|
||||||
|
"""Test the InstrumentFactory."""
|
||||||
|
|
||||||
|
def test_factory_rejects_unknown_backend(self):
|
||||||
|
"""Verify factory raises error for unknown backend."""
|
||||||
|
config = InstrumentConfig(backend="invalid") # type: ignore
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Unknown backend: invalid"):
|
||||||
|
InstrumentFactory.create(config)
|
||||||
|
|
||||||
|
def test_factory_pyvisa_not_implemented(self):
|
||||||
|
"""Verify PyVISA backend raises NotImplementedError."""
|
||||||
|
config = InstrumentConfig(backend="pyvisa")
|
||||||
|
|
||||||
|
with pytest.raises(NotImplementedError, match="PyVISA backend not yet"):
|
||||||
|
InstrumentFactory.create(config)
|
||||||
|
|
||||||
|
def test_factory_creates_instrument_set(self):
|
||||||
|
"""Verify factory creates InstrumentSet with correct structure."""
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
config = InstrumentConfig(backend="simulator")
|
||||||
|
|
||||||
|
# Mock the transports and drivers to avoid actual connections
|
||||||
|
# Patch where they're imported FROM, not where they're used
|
||||||
|
with (
|
||||||
|
patch(
|
||||||
|
"py_dvt_ate.instruments.transport.tcp.TCPTransport"
|
||||||
|
) as mock_tcp_transport,
|
||||||
|
patch(
|
||||||
|
"py_dvt_ate.instruments.drivers.chamber.ThermalChamberDriver"
|
||||||
|
) as mock_chamber,
|
||||||
|
patch(
|
||||||
|
"py_dvt_ate.instruments.drivers.power_supply.PowerSupplyDriver"
|
||||||
|
) as mock_psu,
|
||||||
|
patch(
|
||||||
|
"py_dvt_ate.instruments.drivers.multimeter.MultimeterDriver"
|
||||||
|
) as mock_dmm,
|
||||||
|
):
|
||||||
|
# Create mock instrument instances
|
||||||
|
mock_chamber_instance = Mock(spec=IThermalChamber)
|
||||||
|
mock_psu_instance = Mock(spec=IPowerSupply)
|
||||||
|
mock_dmm_instance = Mock(spec=IMultimeter)
|
||||||
|
|
||||||
|
mock_chamber.return_value = mock_chamber_instance
|
||||||
|
mock_psu.return_value = mock_psu_instance
|
||||||
|
mock_dmm.return_value = mock_dmm_instance
|
||||||
|
|
||||||
|
instrument_set = InstrumentFactory.create(config)
|
||||||
|
|
||||||
|
# Verify InstrumentSet was created
|
||||||
|
assert isinstance(instrument_set, InstrumentSet)
|
||||||
|
|
||||||
|
# Verify transports were created with correct parameters
|
||||||
|
assert mock_tcp_transport.call_count == 3
|
||||||
|
mock_tcp_transport.assert_any_call("localhost", 5001) # chamber
|
||||||
|
mock_tcp_transport.assert_any_call("localhost", 5002) # psu
|
||||||
|
mock_tcp_transport.assert_any_call("localhost", 5003) # dmm
|
||||||
|
|
||||||
|
# Verify drivers were created
|
||||||
|
assert mock_chamber.call_count == 1
|
||||||
|
assert mock_psu.call_count == 1
|
||||||
|
assert mock_dmm.call_count == 1
|
||||||
|
|
||||||
|
# Verify InstrumentSet contains the mock instances
|
||||||
|
assert instrument_set.chamber is mock_chamber_instance
|
||||||
|
assert instrument_set.psu is mock_psu_instance
|
||||||
|
assert instrument_set.dmm is mock_dmm_instance
|
||||||
|
|
||||||
|
def test_factory_uses_custom_ports(self):
|
||||||
|
"""Verify factory uses custom port configuration."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
config = InstrumentConfig(
|
||||||
|
backend="simulator",
|
||||||
|
simulator_host="testserver",
|
||||||
|
chamber_port=7001,
|
||||||
|
psu_port=7002,
|
||||||
|
dmm_port=7003,
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"py_dvt_ate.instruments.transport.tcp.TCPTransport"
|
||||||
|
) as mock_tcp_transport:
|
||||||
|
InstrumentFactory.create(config)
|
||||||
|
|
||||||
|
# Verify custom host and ports were used
|
||||||
|
mock_tcp_transport.assert_any_call("testserver", 7001)
|
||||||
|
mock_tcp_transport.assert_any_call("testserver", 7002)
|
||||||
|
mock_tcp_transport.assert_any_call("testserver", 7003)
|
||||||
|
|
||||||
|
def test_factory_returns_correct_types(self):
|
||||||
|
"""Verify factory returns instruments implementing correct interfaces."""
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
config = InstrumentConfig(backend="simulator")
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("py_dvt_ate.instruments.transport.tcp.TCPTransport"),
|
||||||
|
patch(
|
||||||
|
"py_dvt_ate.instruments.drivers.chamber.ThermalChamberDriver"
|
||||||
|
) as mock_chamber,
|
||||||
|
patch(
|
||||||
|
"py_dvt_ate.instruments.drivers.power_supply.PowerSupplyDriver"
|
||||||
|
) as mock_psu,
|
||||||
|
patch(
|
||||||
|
"py_dvt_ate.instruments.drivers.multimeter.MultimeterDriver"
|
||||||
|
) as mock_dmm,
|
||||||
|
):
|
||||||
|
# Make the mocks subclasses of the interfaces
|
||||||
|
mock_chamber.return_value = Mock(spec=IThermalChamber)
|
||||||
|
mock_psu.return_value = Mock(spec=IPowerSupply)
|
||||||
|
mock_dmm.return_value = Mock(spec=IMultimeter)
|
||||||
|
|
||||||
|
instrument_set = InstrumentFactory.create(config)
|
||||||
|
|
||||||
|
# Verify returned instruments satisfy the interface specs
|
||||||
|
# (Mock with spec=Interface makes isinstance checks work)
|
||||||
|
assert isinstance(instrument_set, InstrumentSet)
|
||||||
@@ -63,7 +63,7 @@ class TestThermalState:
|
|||||||
|
|
||||||
# Should not raise
|
# Should not raise
|
||||||
hash(state)
|
hash(state)
|
||||||
{state} # Can be added to a set
|
_ = {state} # Can be added to a set
|
||||||
|
|
||||||
|
|
||||||
class TestElectricalState:
|
class TestElectricalState:
|
||||||
|
|||||||
263
tests/unit/test_transport.py
Normal file
263
tests/unit/test_transport.py
Normal file
@@ -0,0 +1,263 @@
|
|||||||
|
"""Unit tests for transport layer."""
|
||||||
|
|
||||||
|
import socket
|
||||||
|
from unittest.mock import MagicMock, Mock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from py_dvt_ate.instruments.transport.tcp import TCPTransport
|
||||||
|
|
||||||
|
|
||||||
|
class TestTCPTransport:
|
||||||
|
"""Tests for TCPTransport class."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def transport(self) -> TCPTransport:
|
||||||
|
"""Create transport instance for tests."""
|
||||||
|
return TCPTransport("localhost", 5025, timeout=1.0)
|
||||||
|
|
||||||
|
def test_creation(self, transport: TCPTransport) -> None:
|
||||||
|
"""Test TCPTransport can be created with valid parameters."""
|
||||||
|
assert transport.host == "localhost"
|
||||||
|
assert transport.port == 5025
|
||||||
|
assert not transport.is_connected
|
||||||
|
|
||||||
|
def test_repr(self, transport: TCPTransport) -> None:
|
||||||
|
"""Test string representation."""
|
||||||
|
assert "localhost:5025" in repr(transport)
|
||||||
|
assert "disconnected" in repr(transport)
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_connect_success(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test successful connection."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
|
||||||
|
assert transport.is_connected
|
||||||
|
mock_socket_class.assert_called_once_with(
|
||||||
|
socket.AF_INET, socket.SOCK_STREAM
|
||||||
|
)
|
||||||
|
mock_sock.settimeout.assert_called_once_with(1.0)
|
||||||
|
mock_sock.connect.assert_called_once_with(("localhost", 5025))
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_connect_already_connected(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test connecting when already connected raises error."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
|
||||||
|
with pytest.raises(ConnectionError, match="Already connected"):
|
||||||
|
transport.connect()
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_connect_failure(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test connection failure raises ConnectionError."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
mock_sock.connect.side_effect = OSError("Connection refused")
|
||||||
|
|
||||||
|
with pytest.raises(ConnectionError, match="Failed to connect"):
|
||||||
|
transport.connect()
|
||||||
|
|
||||||
|
assert not transport.is_connected
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_disconnect(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test disconnect closes socket."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
transport.disconnect()
|
||||||
|
|
||||||
|
mock_sock.close.assert_called_once()
|
||||||
|
assert not transport.is_connected
|
||||||
|
|
||||||
|
def test_disconnect_when_not_connected(self, transport: TCPTransport) -> None:
|
||||||
|
"""Test disconnect is idempotent."""
|
||||||
|
transport.disconnect() # Should not raise
|
||||||
|
assert not transport.is_connected
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_write(self, mock_socket_class: Mock, transport: TCPTransport) -> None:
|
||||||
|
"""Test write sends command with newline."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
transport.write("*IDN?")
|
||||||
|
|
||||||
|
mock_sock.sendall.assert_called_once_with(b"*IDN?\n")
|
||||||
|
|
||||||
|
def test_write_not_connected(self, transport: TCPTransport) -> None:
|
||||||
|
"""Test write when not connected raises error."""
|
||||||
|
with pytest.raises(ConnectionError, match="Not connected"):
|
||||||
|
transport.write("*IDN?")
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_write_failure(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test write failure raises IOError."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
mock_sock.sendall.side_effect = OSError("Write failed")
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
|
||||||
|
with pytest.raises(OSError, match="Write failed"):
|
||||||
|
transport.write("*IDN?")
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_read(self, mock_socket_class: Mock, transport: TCPTransport) -> None:
|
||||||
|
"""Test read receives response until newline."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
|
||||||
|
# Simulate receiving "OK\n" byte by byte
|
||||||
|
mock_sock.recv.side_effect = [b"O", b"K", b"\n"]
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
response = transport.read()
|
||||||
|
|
||||||
|
assert response == "OK"
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_read_with_timeout(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test read with custom timeout."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
mock_sock.gettimeout.return_value = 1.0
|
||||||
|
|
||||||
|
# Simulate receiving "OK\n"
|
||||||
|
mock_sock.recv.side_effect = [b"O", b"K", b"\n"]
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
response = transport.read(timeout=2.0)
|
||||||
|
|
||||||
|
assert response == "OK"
|
||||||
|
# Verify timeout was changed and restored
|
||||||
|
assert mock_sock.settimeout.call_count == 3 # connect + custom + restore
|
||||||
|
mock_sock.settimeout.assert_any_call(2.0)
|
||||||
|
mock_sock.settimeout.assert_any_call(1.0)
|
||||||
|
|
||||||
|
def test_read_not_connected(self, transport: TCPTransport) -> None:
|
||||||
|
"""Test read when not connected raises error."""
|
||||||
|
with pytest.raises(ConnectionError, match="Not connected"):
|
||||||
|
transport.read()
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_read_timeout(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test read timeout raises TimeoutError."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
mock_sock.recv.side_effect = TimeoutError("Timed out")
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
|
||||||
|
with pytest.raises(TimeoutError, match="Read timeout"):
|
||||||
|
transport.read()
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_read_connection_closed(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test read when connection closed raises error."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
mock_sock.recv.return_value = b"" # Empty means connection closed
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
|
||||||
|
with pytest.raises(ConnectionError, match="Connection closed"):
|
||||||
|
transport.read()
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_query(self, mock_socket_class: Mock, transport: TCPTransport) -> None:
|
||||||
|
"""Test query combines write and read."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
|
||||||
|
# Simulate receiving "Test Device\n"
|
||||||
|
mock_sock.recv.side_effect = [
|
||||||
|
b"T",
|
||||||
|
b"e",
|
||||||
|
b"s",
|
||||||
|
b"t",
|
||||||
|
b" ",
|
||||||
|
b"D",
|
||||||
|
b"e",
|
||||||
|
b"v",
|
||||||
|
b"i",
|
||||||
|
b"c",
|
||||||
|
b"e",
|
||||||
|
b"\n",
|
||||||
|
]
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
response = transport.query("*IDN?")
|
||||||
|
|
||||||
|
assert response == "Test Device"
|
||||||
|
mock_sock.sendall.assert_called_once_with(b"*IDN?\n")
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_query_with_timeout(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test query with custom timeout."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
mock_sock.gettimeout.return_value = 1.0
|
||||||
|
|
||||||
|
mock_sock.recv.side_effect = [b"O", b"K", b"\n"]
|
||||||
|
|
||||||
|
transport.connect()
|
||||||
|
response = transport.query("*IDN?", timeout=3.0)
|
||||||
|
|
||||||
|
assert response == "OK"
|
||||||
|
mock_sock.settimeout.assert_any_call(3.0)
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_context_manager(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test context manager connects and disconnects."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
|
||||||
|
with transport:
|
||||||
|
assert transport.is_connected
|
||||||
|
|
||||||
|
mock_sock.close.assert_called_once()
|
||||||
|
assert not transport.is_connected
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
def test_context_manager_with_exception(
|
||||||
|
self, mock_socket_class: Mock, transport: TCPTransport
|
||||||
|
) -> None:
|
||||||
|
"""Test context manager disconnects even on exception."""
|
||||||
|
mock_sock = MagicMock()
|
||||||
|
mock_socket_class.return_value = mock_sock
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
with transport:
|
||||||
|
raise ValueError("Test error")
|
||||||
|
|
||||||
|
mock_sock.close.assert_called_once()
|
||||||
|
assert not transport.is_connected
|
||||||
Reference in New Issue
Block a user