25 Commits

Author SHA1 Message Date
9e9c0ae0e5 Release v0.1.0-alpha.3
Some checks failed
CI / Lint (push) Failing after 4s
CI / Type Check (push) Failing after 17s
CI / Test (push) Successful in 9s
CI / Release (push) Has been cancelled
2025-06-02 22:56:53 +00:00
a742d57a6f Add TCP server integration tests
Test connection handling, multiple clients, instrument access across
all three virtual instruments, physics engine integration, and error
handling. Update pytest-asyncio config for v1.x compatibility.
2025-05-30 22:59:33 +00:00
2d358062f4 Add simulation server entry point
Create SimulationServer that wires physics engine to all virtual
instruments and exposes them over TCP. Add 'serve' CLI command to
start the server with configurable ports and physics rate.
2025-05-30 19:31:01 +00:00
1a489b9106 Implement TCP client handling
Add async client connection handling with:
- Multiple concurrent connections per port
- Line-based SCPI protocol (newline terminated)
- start(), stop(), and serve_forever() methods
- Proper connection lifecycle and error handling
2025-05-24 13:48:10 +00:00
f9e59da32b Add async TCP server foundation
Create InstrumentServer class with asyncio for hosting virtual SCPI
instruments over TCP. Supports registering instruments on specific
ports with port-to-instrument mapping.
2025-05-22 21:32:38 +00:00
a4c01c856d Add multimeter simulator tests
Comprehensive test coverage for MultimeterSim including MEAS:VOLT:DC,
MEAS:CURR:DC, CONF, and READ commands. Tests both standalone operation
and physics engine integration including temperature-dependent measurements.
2025-05-21 22:55:40 +00:00
144e80f87a Add multimeter simulator
Implement SCPI-based virtual DMM with DC voltage and current measurement.
Supports MEAS, CONF, and READ commands. Integrates with physics engine
for DUT output measurements.
2025-05-16 23:48:11 +00:00
e811b21082 Add power supply simulator tests
Comprehensive test coverage for PowerSupplySim including VOLT, CURR,
OUTP, and MEAS commands. Tests both standalone operation and physics
engine integration.
2025-05-12 17:29:00 +00:00
9a88a35cc5 Add power supply simulator
Implement SCPI-based virtual power supply with voltage/current control
and output enable commands. Integrates with physics engine for DUT
input voltage simulation.
2025-05-09 20:21:07 +00:00
b31324a42a Add thermal chamber simulator tests
Tests for ThermalChamberSim SCPI command responses:
- Basic IEEE 488.2 commands (*IDN?, *RST, *OPC?)
- TEMP:SETPOINT set/query
- TEMP:ACTUAL? query
- TEMP:STAB? stability query
- Physics engine integration tests
2025-05-05 21:00:43 +00:00
008134844d Implement thermal chamber SCPI commands
- TEMP:SETPOINT: Set/query target temperature
- TEMP:ACTUAL?: Query actual chamber temperature from physics engine
- TEMP:STAB?: Query temperature stability (within 0.5°C threshold)
2025-05-04 19:34:48 +00:00
ae85948539 Add thermal chamber simulator stub
Defines ThermalChamberSim class with stub SCPI command handlers for
TEMP:SETPOINT, TEMP:ACTUAL?, and TEMP:STAB? commands.
2025-05-02 23:33:16 +00:00
bccb8cc420 Add base instrument class
Provides SCPI command parsing and dispatch mechanism for virtual
instruments. Includes IEEE 488.2 common commands (*IDN?, *RST, *CLS,
*OPC) and abstract methods for instrument-specific implementations.
2025-04-28 19:24:24 +00:00
510e1ba683 Add SCPI parser tests
Comprehensive test suite for SCPI command parsing:
- SCPICommand dataclass tests (creation, keyword property)
- Parser tests for queries, commands, arguments
- IEEE 488.2 common command tests (*IDN?, *RST, etc.)
- Edge cases (whitespace, empty strings)
- Instrument-specific command tests

Also fixed bug where is_query was determined from command string
ending rather than header ending (handles queries with arguments).
2025-04-21 13:10:50 +00:00
5e69085875 Implement SCPI parser
Adds SCPIParser class with parse() method that handles:
- IEEE 488.2 common commands (*IDN?, *RST, etc.)
- Query commands (ending with '?')
- Commands with comma-separated arguments
- Whitespace stripping
2025-04-21 12:03:55 +00:00
5053399851 Add SCPI command dataclass
Defines SCPICommand dataclass for parsed SCPI commands with:
- header: command header (e.g., "TEMP:SETPOINT")
- arguments: list of command arguments
- is_query: whether command is a query
- keyword property: header without trailing '?'
2025-04-16 23:08:32 +00:00
d54ada18b2 Remove fragment from sidebar controls (not supported)
Sidebar controls cannot be in a fragment. Brief blank on
slider change is a Streamlit limitation.
2025-04-15 21:25:23 +00:00
252c329562 Put sidebar controls in fragment to prevent page blanking
Both controls and display are now fragments, so slider
changes don't trigger full page reruns.
2025-04-13 18:47:37 +00:00
6e7da7f382 Use st.fragment for smooth dashboard updates
Replace st.rerun() with @st.fragment decorator to prevent
full page reloads and eliminate UI greying out.
2025-04-08 13:08:15 +00:00
75e0a1cc25 Fix dashboard simulation speed with time multiplier
- Add time multiplier control (1× to 100× speed)
- Calculate steps based on real elapsed time
- Add 50ms delay to prevent UI thrashing
- Display current speed in Sim Time metric
2025-04-05 17:58:41 +00:00
1c0d2ead54 Release v0.1.0-alpha.2
Some checks failed
CI / Test (push) Successful in 9s
CI / Release (push) Has been cancelled
CI / Lint (push) Failing after 4s
CI / Type Check (push) Failing after 17s
2025-04-03 21:20:13 +00:00
2b78a75f51 Add self-heating visualisation 2025-03-29 17:04:13 +00:00
15c9033153 Add interactive physics controls 2025-03-24 15:02:51 +00:00
0ab1181ec4 Add physics visualisation panel 2025-03-24 14:20:53 +00:00
bb3129e69b Add Streamlit dashboard skeleton 2025-03-18 14:24:17 +00:00
22 changed files with 3050 additions and 4 deletions

View File

@@ -7,6 +7,40 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.1.0-alpha.3] - 2025-12-02
### Added
- Async TCP server for exposing virtual instruments over network
- InstrumentServer class with multi-port, multi-client support
- Line-based SCPI protocol (newline-terminated commands/responses)
- SimulationServer wiring physics engine to all virtual instruments
- CLI `serve` command to start simulation server with configurable ports
- Integration tests for TCP server and instrument connectivity
### Infrastructure
- SCPI foundation (Sprint 5): command parser with IEEE 488.2 support
- Virtual instrument base class with command dispatch
- Thermal chamber simulator (TEMP:SETPOINT, TEMP:ACTUAL?, TEMP:STAB?)
- Power supply simulator (VOLT, CURR, OUTP, MEAS commands)
- Multimeter simulator (MEAS:VOLT:DC?, MEAS:CURR:DC?, CONF, READ?)
## [0.1.0-alpha.2] - 2025-12-02
### Added
- Streamlit dashboard for interactive physics visualisation
- Real-time temperature charts (chamber, case, junction)
- Current state metrics display (voltages, currents, power, temperatures)
- Interactive controls in sidebar:
- Temperature setpoint slider (-40°C to 125°C)
- Input voltage slider (0-12V)
- Load current slider (0-500mA)
- Output enable toggle
- Start/Stop/Reset simulation buttons
- Self-heating demonstration panel with:
- Junction-case and case-ambient temperature rise display
- Power dissipation chart
- Thermal coupling explanation
## [0.1.0-alpha.1] - 2025-12-02 ## [0.1.0-alpha.1] - 2025-12-02
### Added ### Added
@@ -42,7 +76,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
| 0.1.0 | TBD | MVP Complete | | 0.1.0 | TBD | MVP Complete |
| 0.1.0-beta.2 | TBD | First DVT test runs | | 0.1.0-beta.2 | TBD | First DVT test runs |
| 0.1.0-beta.1 | TBD | HAL complete | | 0.1.0-beta.1 | TBD | HAL complete |
| 0.1.0-alpha.3 | TBD | Network ready | | 0.1.0-alpha.3 | 2025-12-02 | Network ready |
| 0.1.0-alpha.2 | TBD | Visual demo | | 0.1.0-alpha.2 | 2025-12-02 | Visual demo |
| 0.1.0-alpha.1 | 2025-12-02 | Physics engine | | 0.1.0-alpha.1 | 2025-12-02 | Physics engine |
| 0.0.1 | 2025-12-01 | Project scaffolding | | 0.0.1 | 2025-12-01 | Project scaffolding |

View File

@@ -86,5 +86,8 @@ ignore_missing_imports = true
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = ["tests"] testpaths = ["tests"]
asyncio_mode = "auto"
addopts = "-v --tb=short" addopts = "-v --tb=short"
[tool.pytest-asyncio]
mode = "auto"
default_fixture_loop_scope = "function"

View File

@@ -1,3 +1,3 @@
"""py_dvt_ate: Coupled Physics DVT Simulation Platform.""" """py_dvt_ate: Coupled Physics DVT Simulation Platform."""
__version__ = "0.1.0-alpha.1" __version__ = "0.1.0-alpha.3"

View File

@@ -36,5 +36,52 @@ def main(
"""py-dvt-ate: Coupled Physics DVT Simulation Platform.""" """py-dvt-ate: Coupled Physics DVT Simulation Platform."""
@app.command()
def serve(
host: Annotated[
str,
typer.Option("--host", "-h", help="Host address to bind to."),
] = "127.0.0.1",
chamber_port: Annotated[
int,
typer.Option("--chamber-port", help="Port for thermal chamber instrument."),
] = 5000,
psu_port: Annotated[
int,
typer.Option("--psu-port", help="Port for power supply instrument."),
] = 5001,
dmm_port: Annotated[
int,
typer.Option("--dmm-port", help="Port for multimeter instrument."),
] = 5002,
physics_rate: Annotated[
float,
typer.Option("--physics-rate", help="Physics engine update rate in Hz."),
] = 100.0,
) -> None:
"""Start the simulation server with virtual instruments.
Runs a TCP server hosting virtual SCPI instruments connected to a
shared physics engine. Each instrument listens on its own port.
"""
from py_dvt_ate.simulation.server import main as run_server
typer.echo(f"Starting simulation server on {host}...")
typer.echo(f" Thermal chamber: port {chamber_port}")
typer.echo(f" Power supply: port {psu_port}")
typer.echo(f" Multimeter: port {dmm_port}")
typer.echo(f" Physics rate: {physics_rate} Hz")
typer.echo("")
typer.echo("Press Ctrl+C to stop.")
run_server(
host=host,
chamber_port=chamber_port,
psu_port=psu_port,
dmm_port=dmm_port,
physics_rate=physics_rate,
)
if __name__ == "__main__": if __name__ == "__main__":
app() app()

View File

@@ -3,3 +3,7 @@
Provides visualisation of instrument status, test progress, Provides visualisation of instrument status, test progress,
and historical results. and historical results.
""" """
from py_dvt_ate.app.dashboard.app import main
__all__ = ["main"]

View File

@@ -0,0 +1,321 @@
"""Streamlit dashboard application for physics simulation visualisation.
This module provides an interactive dashboard for visualising the physics
engine directly, demonstrating thermal-electrical coupling in real-time.
"""
import time
from collections import deque
from dataclasses import dataclass, field
import streamlit as st
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
# History buffer size for charts
HISTORY_SIZE = 500
@dataclass
class SimulationHistory:
"""Stores time series data for visualisation."""
time: deque[float] = field(default_factory=lambda: deque(maxlen=HISTORY_SIZE))
chamber_temp: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
case_temp: deque[float] = field(default_factory=lambda: deque(maxlen=HISTORY_SIZE))
junction_temp: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
output_voltage: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
power_dissipation: deque[float] = field(
default_factory=lambda: deque(maxlen=HISTORY_SIZE)
)
def init_session_state() -> None:
"""Initialise Streamlit session state."""
if "engine" not in st.session_state:
st.session_state.engine = PhysicsEngine(update_rate_hz=100.0)
if "history" not in st.session_state:
st.session_state.history = SimulationHistory()
if "running" not in st.session_state:
st.session_state.running = False
if "last_update" not in st.session_state:
st.session_state.last_update = time.time()
# Note: time_multiplier, temp_setpoint, input_voltage, output_enabled,
# load_current are managed by their respective widgets via keys
def step_simulation() -> None:
"""Advance the simulation based on elapsed real time and multiplier."""
engine: PhysicsEngine = st.session_state.engine
history: SimulationHistory = st.session_state.history
multiplier: float = st.session_state.get("time_multiplier", 10)
# Calculate how much simulation time to advance
current_time = time.time()
elapsed_real = current_time - st.session_state.last_update
st.session_state.last_update = current_time
# Simulation time to advance (capped to prevent huge jumps)
sim_time_to_advance = min(elapsed_real * multiplier, 2.0)
# Calculate number of steps needed
steps = int(sim_time_to_advance / engine.dt)
steps = max(1, min(steps, 1000)) # Clamp between 1 and 1000 steps
for _ in range(steps):
engine.step()
# Record current state in history
thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
history.time.append(thermal.timestamp)
history.chamber_temp.append(thermal.chamber_temperature)
history.case_temp.append(thermal.case_temperature)
history.junction_temp.append(thermal.junction_temperature)
history.output_voltage.append(electrical.output_voltage)
history.power_dissipation.append(electrical.power_dissipation)
def sync_engine_from_session_state() -> None:
"""Sync engine parameters from session state (called by fragment)."""
engine: PhysicsEngine = st.session_state.engine
engine.set_chamber_setpoint(st.session_state.get("temp_setpoint", 25.0))
engine.set_input_voltage(st.session_state.get("input_voltage", 5.0))
engine.set_output_enabled(st.session_state.get("output_enabled", False))
engine.set_load_current(st.session_state.get("load_current", 100.0) / 1000.0)
def display_controls() -> None:
"""Display simulation control panel in sidebar."""
st.sidebar.header("Simulation Controls")
# Start/Stop button
if st.session_state.running:
if st.sidebar.button(
"Stop Simulation", type="primary", use_container_width=True
):
st.session_state.running = False
else:
if st.sidebar.button(
"Start Simulation", type="primary", use_container_width=True
):
st.session_state.running = True
st.session_state.last_update = time.time()
# Reset button
if st.sidebar.button("Reset", use_container_width=True):
st.session_state.engine = PhysicsEngine(update_rate_hz=100.0)
st.session_state.history = SimulationHistory()
st.session_state.running = False
st.session_state.last_update = time.time()
st.sidebar.divider()
# Time multiplier
st.sidebar.subheader("Simulation Speed")
st.sidebar.select_slider(
"Time Multiplier",
options=[1, 2, 5, 10, 20, 50, 100],
value=10,
format_func=lambda x: f"{x}x",
key="time_multiplier",
)
st.sidebar.caption(
f"1 real second = {st.session_state.get('time_multiplier', 10)} simulation seconds"
)
st.sidebar.divider()
# Temperature setpoint
st.sidebar.subheader("Thermal Chamber")
st.sidebar.slider(
"Temperature Setpoint (C)",
min_value=-40.0,
max_value=125.0,
value=25.0,
step=5.0,
key="temp_setpoint",
)
st.sidebar.divider()
# Power supply controls
st.sidebar.subheader("Power Supply")
st.sidebar.slider(
"Input Voltage (V)",
min_value=0.0,
max_value=12.0,
value=5.0,
step=0.1,
key="input_voltage",
)
st.sidebar.toggle(
"Output Enabled",
value=False,
key="output_enabled",
)
st.sidebar.divider()
# Load controls
st.sidebar.subheader("Electronic Load")
st.sidebar.slider(
"Load Current (mA)",
min_value=0.0,
max_value=500.0,
value=100.0,
step=10.0,
key="load_current",
)
@st.fragment(run_every=0.1)
def simulation_display() -> None:
"""Fragment that displays and updates simulation state."""
engine: PhysicsEngine = st.session_state.engine
history: SimulationHistory = st.session_state.history
# Sync engine parameters from UI controls
sync_engine_from_session_state()
# Step simulation if running
if st.session_state.running:
step_simulation()
# Get current state
thermal = engine.get_thermal_state()
electrical = engine.get_electrical_state()
# Current state metrics
st.subheader("Current State")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Chamber Temp", f"{thermal.chamber_temperature:.2f} C")
with col2:
st.metric("Case Temp", f"{thermal.case_temperature:.2f} C")
with col3:
st.metric("Junction Temp", f"{thermal.junction_temperature:.2f} C")
with col4:
st.metric("Output Voltage", f"{electrical.output_voltage:.4f} V")
col5, col6, col7, col8 = st.columns(4)
with col5:
st.metric("Input Voltage", f"{electrical.input_voltage:.2f} V")
with col6:
st.metric("Load Current", f"{electrical.load_current * 1000:.1f} mA")
with col7:
st.metric("Power Diss.", f"{electrical.power_dissipation * 1000:.2f} mW")
with col8:
status = "Running" if st.session_state.running else "Stopped"
st.metric(
"Sim Time",
f"{engine.simulation_time:.1f} s",
delta=f"{status} @ {st.session_state.get('time_multiplier', 10):.0f}x",
)
# Temperature chart
st.subheader("Temperature History")
if len(history.time) < 2:
st.info("Start the simulation to see temperature data")
else:
chart_data = {
"Time (s)": list(history.time),
"Chamber": list(history.chamber_temp),
"Case": list(history.case_temp),
"Junction": list(history.junction_temp),
}
st.line_chart(
chart_data,
x="Time (s)",
y=["Chamber", "Case", "Junction"],
color=["#1f77b4", "#ff7f0e", "#d62728"],
)
# Self-heating demonstration
st.subheader("Self-Heating Demonstration")
delta_t_jc = thermal.junction_temperature - thermal.case_temperature
delta_t_ca = thermal.case_temperature - thermal.chamber_temperature
col1, col2 = st.columns(2)
with col1:
st.markdown("#### Self-Heating Analysis")
st.markdown(
f"""
| Parameter | Value |
|-----------|-------|
| Junction-Case Rise (dT_jc) | **{delta_t_jc:.2f} C** |
| Case-Ambient Rise (dT_ca) | **{delta_t_ca:.2f} C** |
| Power Dissipation | {electrical.power_dissipation * 1000:.1f} mW |
| theta_jc (junction-case) | 15 C/W |
| theta_ca (case-ambient) | 5 C/W |
"""
)
st.markdown(
"""
**Thermal Coupling:** The junction temperature rises above the case
temperature due to power dissipation. This is governed by:
`T_junction = T_case + P_diss x theta_jc`
Try increasing the load current or input voltage to see
self-heating effects!
"""
)
with col2:
st.markdown("#### Power Dissipation")
if len(history.time) < 2:
st.info("Start the simulation to see power data")
else:
power_data = {
"Time (s)": list(history.time),
"Power (mW)": [p * 1000 for p in history.power_dissipation],
}
st.line_chart(
power_data,
x="Time (s)",
y="Power (mW)",
color="#2ca02c",
)
def main() -> None:
"""Main entry point for the Streamlit dashboard."""
st.set_page_config(
page_title="py-dvt-ate Virtual Lab Bench",
page_icon="🔬",
layout="wide",
)
st.title("py-dvt-ate Virtual Lab Bench")
st.markdown(
"""
Interactive physics simulation demonstrating coupled thermal-electrical
behaviour of an LDO voltage regulator.
"""
)
init_session_state()
# Sidebar controls (static - doesn't need fragment)
display_controls()
# Dynamic simulation display (uses fragment for smooth updates)
simulation_display()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,87 @@
"""SCPI command parsing.
This module provides SCPI (Standard Commands for Programmable Instruments)
command parsing for instrument communication. It handles IEEE 488.2 common
commands (*IDN?, *RST, etc.) and instrument-specific commands.
"""
from dataclasses import dataclass
@dataclass
class SCPICommand:
"""Parsed SCPI command.
Attributes:
header: The command header (e.g., "TEMP:SETPOINT" or "*IDN").
arguments: List of command arguments (e.g., ["85.0"]).
is_query: True if the command ends with '?' (query command).
"""
header: str
arguments: list[str]
is_query: bool
@property
def keyword(self) -> str:
"""Return the command keyword without '?'.
For query commands like "TEMP:SETPOINT?", returns "TEMP:SETPOINT".
For regular commands like "VOLT", returns "VOLT".
"""
return self.header.rstrip("?")
class SCPIParser:
"""Parse SCPI command strings.
Handles both IEEE 488.2 common commands (e.g., *IDN?, *RST) and
instrument-specific commands (e.g., VOLT 3.3, TEMP:SETPOINT?).
Examples:
>>> parser = SCPIParser()
>>> cmd = parser.parse("*IDN?")
>>> cmd.header, cmd.is_query
('*IDN?', True)
>>> cmd = parser.parse("VOLT 3.3")
>>> cmd.header, cmd.arguments
('VOLT', ['3.3'])
"""
def parse(self, command_string: str) -> SCPICommand:
"""Parse a SCPI command string.
Args:
command_string: The raw SCPI command string to parse.
Returns:
SCPICommand with parsed header, arguments, and query flag.
Examples:
"*IDN?" -> SCPICommand("*IDN?", [], True)
"VOLT 3.3" -> SCPICommand("VOLT", ["3.3"], False)
"TEMP:SETPOINT?" -> SCPICommand("TEMP:SETPOINT?", [], True)
"CONF:VOLT:DC 10,0.001" -> SCPICommand("CONF:VOLT:DC", ["10", "0.001"], False)
"""
command_string = command_string.strip()
if not command_string:
return SCPICommand(header="", arguments=[], is_query=False)
# Split into header and arguments on first whitespace
parts = command_string.split(None, 1)
header = parts[0]
arguments: list[str] = []
if len(parts) > 1:
# Parse comma-separated arguments
arg_string = parts[1]
arguments = [arg.strip() for arg in arg_string.split(",")]
# Query is determined by whether the header ends with '?'
is_query = header.endswith("?")
return SCPICommand(
header=header,
arguments=arguments,
is_query=is_query,
)

View File

@@ -3,3 +3,8 @@
Provides virtual instruments backed by a coupled thermal-electrical Provides virtual instruments backed by a coupled thermal-electrical
physics engine. Used for development and testing without real hardware. physics engine. Used for development and testing without real hardware.
""" """
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.simulation.tcp_server import InstrumentServer
__all__ = ["InstrumentServer", "ServerConfig", "SimulationServer"]

View File

@@ -0,0 +1,240 @@
"""Simulation server entry point.
This module provides the main entry point for running the simulation server
with all virtual instruments wired to a shared physics engine.
"""
from __future__ import annotations
import asyncio
import logging
import signal
from dataclasses import dataclass
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.tcp_server import InstrumentServer
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
logger = logging.getLogger(__name__)
@dataclass
class ServerConfig:
"""Configuration for the simulation server.
Attributes:
host: Host address to bind to.
chamber_port: Port for thermal chamber instrument.
psu_port: Port for power supply instrument.
dmm_port: Port for multimeter instrument.
physics_rate_hz: Physics engine update rate in Hz.
"""
host: str = "127.0.0.1"
chamber_port: int = 5000
psu_port: int = 5001
dmm_port: int = 5002
physics_rate_hz: float = 100.0
class SimulationServer:
"""Complete simulation server with physics engine and instruments.
Creates a physics engine and wires it to all virtual instruments,
then exposes them over TCP for client access.
"""
def __init__(self, config: ServerConfig | None = None) -> None:
"""Initialise the simulation server.
Args:
config: Server configuration. Uses defaults if not provided.
"""
self._config = config or ServerConfig()
self._physics_engine: PhysicsEngine | None = None
self._instrument_server: InstrumentServer | None = None
self._physics_task: asyncio.Task[None] | None = None
self._running = False
@property
def is_running(self) -> bool:
"""Check if server is currently running."""
return self._running
@property
def physics_engine(self) -> PhysicsEngine | None:
"""Get the physics engine instance."""
return self._physics_engine
def _setup(self) -> None:
"""Create and wire up all components."""
# Create physics engine
self._physics_engine = PhysicsEngine(
update_rate_hz=self._config.physics_rate_hz
)
# Create instruments connected to physics engine
chamber = ThermalChamberSim(self._physics_engine)
psu = PowerSupplySim(self._physics_engine)
dmm = MultimeterSim(self._physics_engine)
# Create TCP server and register instruments
self._instrument_server = InstrumentServer(host=self._config.host)
self._instrument_server.register_instrument(self._config.chamber_port, chamber)
self._instrument_server.register_instrument(self._config.psu_port, psu)
self._instrument_server.register_instrument(self._config.dmm_port, dmm)
logger.info(
"Simulation server configured: chamber=%d, psu=%d, dmm=%d",
self._config.chamber_port,
self._config.psu_port,
self._config.dmm_port,
)
async def _run_physics(self) -> None:
"""Run the physics engine simulation loop."""
if self._physics_engine is None:
return
dt = self._physics_engine.dt
while self._running:
self._physics_engine.step()
# Sleep for the physics timestep
await asyncio.sleep(dt)
async def start(self) -> None:
"""Start the simulation server.
Sets up all components and starts the TCP server and physics engine.
Raises:
RuntimeError: If server is already running.
"""
if self._running:
raise RuntimeError("Server is already running")
self._setup()
self._running = True
# Start TCP server
if self._instrument_server is not None:
await self._instrument_server.start()
# Start physics engine loop
self._physics_task = asyncio.create_task(self._run_physics())
logger.info("Simulation server started")
async def stop(self) -> None:
"""Stop the simulation server."""
if not self._running:
return
self._running = False
# Cancel physics loop
if self._physics_task is not None:
self._physics_task.cancel()
try:
await self._physics_task
except asyncio.CancelledError:
pass
self._physics_task = None
# Stop TCP server
if self._instrument_server is not None:
await self._instrument_server.stop()
self._instrument_server = None
self._physics_engine = None
logger.info("Simulation server stopped")
async def serve_forever(self) -> None:
"""Start the server and run until cancelled."""
await self.start()
try:
# Wait for the physics task (which runs until cancelled)
if self._physics_task is not None:
await self._physics_task
except asyncio.CancelledError:
pass
finally:
await self.stop()
async def run_server(config: ServerConfig | None = None) -> None:
"""Run the simulation server with signal handling.
This is the main entry point for running the server. It sets up
signal handlers for graceful shutdown.
Args:
config: Server configuration. Uses defaults if not provided.
"""
server = SimulationServer(config)
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
def signal_handler() -> None:
logger.info("Shutdown signal received")
stop_event.set()
# Register signal handlers (Unix-style, may not work on all Windows)
try:
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, signal_handler)
except NotImplementedError:
# Windows doesn't support add_signal_handler
pass
try:
await server.start()
logger.info("Simulation server running. Press Ctrl+C to stop.")
# Wait for stop signal
await stop_event.wait()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
finally:
await server.stop()
def main(
host: str = "127.0.0.1",
chamber_port: int = 5000,
psu_port: int = 5001,
dmm_port: int = 5002,
physics_rate: float = 100.0,
) -> None:
"""Run the simulation server from command line.
Args:
host: Host address to bind to.
chamber_port: Port for thermal chamber.
psu_port: Port for power supply.
dmm_port: Port for multimeter.
physics_rate: Physics engine update rate in Hz.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
config = ServerConfig(
host=host,
chamber_port=chamber_port,
psu_port=psu_port,
dmm_port=dmm_port,
physics_rate_hz=physics_rate,
)
asyncio.run(run_server(config))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,212 @@
"""Async TCP server for exposing virtual instruments over network.
This module provides the InstrumentServer class that hosts virtual SCPI
instruments over TCP, allowing client applications to communicate using
standard SCPI commands over a network connection.
"""
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from py_dvt_ate.simulation.virtual.base import BaseInstrument
# Re-export for type checking - actual import happens at runtime via registration
__all__ = ["InstrumentServer"]
logger = logging.getLogger(__name__)
class InstrumentServer:
"""Async TCP server hosting virtual SCPI instruments.
Each instrument is assigned a port. Clients connect via TCP and send
SCPI commands as newline-terminated strings. Responses are also
newline-terminated.
Attributes:
host: Host address to bind to.
"""
def __init__(self, host: str = "127.0.0.1") -> None:
"""Initialise the instrument server.
Args:
host: Host address to bind to. Defaults to localhost.
"""
self._host = host
self._instruments: dict[int, BaseInstrument] = {}
self._servers: list[asyncio.Server] = []
self._running = False
@property
def host(self) -> str:
"""Get the host address."""
return self._host
@property
def is_running(self) -> bool:
"""Check if server is currently running."""
return self._running
def register_instrument(self, port: int, instrument: BaseInstrument) -> None:
"""Register an instrument to be served on a specific port.
Args:
port: TCP port number to serve the instrument on.
instrument: Virtual instrument to serve.
Raises:
ValueError: If port is already registered.
RuntimeError: If server is already running.
"""
if self._running:
raise RuntimeError("Cannot register instruments while server is running")
if port in self._instruments:
raise ValueError(f"Port {port} is already registered")
self._instruments[port] = instrument
logger.info(
"Registered %s on port %d",
instrument.__class__.__name__,
port,
)
def get_instrument(self, port: int) -> BaseInstrument | None:
"""Get the instrument registered on a port.
Args:
port: Port number to look up.
Returns:
Registered instrument, or None if port not registered.
"""
return self._instruments.get(port)
@property
def registered_ports(self) -> list[int]:
"""Get list of registered port numbers."""
return list(self._instruments.keys())
async def start(self) -> None:
"""Start the server and begin listening on all registered ports.
Creates a TCP server for each registered instrument port.
Raises:
RuntimeError: If server is already running or no instruments registered.
"""
if self._running:
raise RuntimeError("Server is already running")
if not self._instruments:
raise RuntimeError("No instruments registered")
self._running = True
for port, instrument in self._instruments.items():
server = await asyncio.start_server(
lambda r, w, inst=instrument, p=port: self._handle_client(r, w, inst, p),
self._host,
port,
)
self._servers.append(server)
logger.info(
"Started server for %s on %s:%d",
instrument.__class__.__name__,
self._host,
port,
)
async def stop(self) -> None:
"""Stop the server and close all connections."""
if not self._running:
return
for server in self._servers:
server.close()
await server.wait_closed()
self._servers.clear()
self._running = False
logger.info("Server stopped")
async def serve_forever(self) -> None:
"""Start the server and run until cancelled.
This is a convenience method that starts the server and blocks
until the server is stopped or cancelled.
"""
await self.start()
try:
# Keep running until cancelled
await asyncio.gather(
*[server.serve_forever() for server in self._servers]
)
finally:
await self.stop()
async def _handle_client(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
instrument: BaseInstrument,
port: int,
) -> None:
"""Handle a client connection.
Reads SCPI commands (newline-terminated), processes them through
the instrument, and sends back responses (newline-terminated).
Args:
reader: Stream reader for incoming data.
writer: Stream writer for outgoing data.
instrument: The instrument to process commands.
port: Port number for logging.
"""
addr = writer.get_extra_info("peername")
logger.info("Client connected to port %d from %s", port, addr)
try:
while True:
# Read until newline (SCPI line terminator)
data = await reader.readline()
if not data:
# Client disconnected
break
# Decode and strip whitespace
command = data.decode("utf-8").strip()
if not command:
continue
logger.debug("Port %d received: %s", port, command)
# Process command through instrument
response = instrument.process(command)
# Send response with newline terminator
if response:
writer.write(f"{response}\n".encode("utf-8"))
await writer.drain()
logger.debug("Port %d sent: %s", port, response)
except asyncio.CancelledError:
logger.debug("Client handler cancelled for port %d", port)
except ConnectionResetError:
logger.debug("Client connection reset on port %d", port)
except Exception as e:
logger.error("Error handling client on port %d: %s", port, e)
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
logger.info("Client disconnected from port %d", port)

View File

@@ -0,0 +1,156 @@
"""Base class for virtual instrument simulators.
This module provides the foundation for implementing SCPI-based virtual
instruments that can be exposed over TCP for hardware abstraction testing.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Callable
from typing import TYPE_CHECKING
from py_dvt_ate.instruments.scpi import SCPICommand, SCPIParser
if TYPE_CHECKING:
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
# Type alias for command handlers
CommandHandler = Callable[[SCPICommand], str]
class BaseInstrument(ABC):
"""Abstract base class for virtual SCPI instruments.
Provides common functionality for SCPI command parsing and dispatch.
Subclasses should register command handlers using the register_command
method or by overriding _setup_commands.
Attributes:
manufacturer: Instrument manufacturer name for *IDN? response.
model: Instrument model name for *IDN? response.
serial_number: Instrument serial number for *IDN? response.
firmware_version: Firmware version for *IDN? response.
"""
manufacturer: str = "PyDVTATE"
model: str = "Virtual Instrument"
serial_number: str = "SIM001"
firmware_version: str = "1.0.0"
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
"""Initialise the base instrument.
Args:
physics_engine: Reference to physics engine for simulation state.
May be None for standalone operation.
"""
self._physics_engine = physics_engine
self._parser = SCPIParser()
self._handlers: dict[str, CommandHandler] = {}
self._setup_common_commands()
self._setup_commands()
def _setup_common_commands(self) -> None:
"""Register IEEE 488.2 common commands."""
self.register_command("*IDN", self._handle_idn)
self.register_command("*RST", self._handle_rst)
self.register_command("*CLS", self._handle_cls)
self.register_command("*OPC", self._handle_opc)
@abstractmethod
def _setup_commands(self) -> None:
"""Register instrument-specific command handlers.
Subclasses must implement this method to register their
SCPI command handlers using register_command().
"""
def register_command(self, keyword: str, handler: CommandHandler) -> None:
"""Register a handler for a SCPI command keyword.
Args:
keyword: The command keyword (e.g., "TEMP:SETPOINT").
For commands that support both set and query forms,
register the base keyword without '?'.
handler: Callable that takes SCPICommand and returns response string.
"""
self._handlers[keyword.upper()] = handler
def process(self, command_string: str) -> str:
"""Process a SCPI command string and return the response.
Args:
command_string: Raw SCPI command string to process.
Returns:
Response string. Empty string for commands with no response.
Error string starting with "ERROR:" for invalid commands.
"""
command = self._parser.parse(command_string)
if not command.header:
return ""
# Look up handler by keyword (without '?' suffix)
keyword = command.keyword.upper()
handler = self._handlers.get(keyword)
if handler is None:
return f"ERROR: Unknown command '{keyword}'"
try:
return handler(command)
except ValueError as e:
return f"ERROR: {e}"
except Exception as e:
return f"ERROR: Internal error - {e}"
def _handle_idn(self, command: SCPICommand) -> str:
"""Handle *IDN? identification query.
Returns:
Comma-separated identification string.
"""
if not command.is_query:
return "ERROR: *IDN is query only"
return f"{self.manufacturer},{self.model},{self.serial_number},{self.firmware_version}"
def _handle_rst(self, command: SCPICommand) -> str:
"""Handle *RST reset command.
Returns:
Empty string (no response for reset).
"""
if command.is_query:
return "ERROR: *RST is command only"
self.reset()
return ""
def _handle_cls(self, command: SCPICommand) -> str:
"""Handle *CLS clear status command.
Returns:
Empty string (no response for clear).
"""
if command.is_query:
return "ERROR: *CLS is command only"
return ""
def _handle_opc(self, command: SCPICommand) -> str:
"""Handle *OPC operation complete command/query.
Returns:
"1" for query, empty string for command.
"""
if command.is_query:
return "1"
return ""
@abstractmethod
def reset(self) -> None:
"""Reset instrument to default state.
Subclasses must implement this to define reset behaviour.
"""

View File

@@ -0,0 +1,143 @@
"""Virtual thermal chamber simulator.
This module implements a SCPI-based virtual thermal chamber that interfaces
with the physics engine to provide realistic temperature control simulation.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from py_dvt_ate.instruments.scpi import SCPICommand
from py_dvt_ate.simulation.virtual.base import BaseInstrument
if TYPE_CHECKING:
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
class ThermalChamberSim(BaseInstrument):
"""Virtual thermal chamber simulator.
Simulates a thermal chamber with SCPI control interface. The chamber
temperature behaviour is driven by the physics engine.
SCPI Commands:
TEMP:SETPOINT <value> - Set target temperature in degrees C
TEMP:SETPOINT? - Query current setpoint
TEMP:ACTUAL? - Query actual chamber temperature
TEMP:STAB? - Query temperature stability (1=stable, 0=settling)
Attributes:
manufacturer: "PyDVTATE"
model: "TC-SIM-001"
"""
manufacturer = "PyDVTATE"
model = "TC-SIM-001"
serial_number = "TCSIM001"
firmware_version = "1.0.0"
# Stability threshold in degrees C
STABILITY_THRESHOLD = 0.5
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
"""Initialise the thermal chamber simulator.
Args:
physics_engine: Reference to physics engine for temperature state.
"""
self._setpoint = 25.0 # Default setpoint
super().__init__(physics_engine)
def _setup_commands(self) -> None:
"""Register thermal chamber SCPI commands."""
self.register_command("TEMP:SETPOINT", self._handle_temp_setpoint)
self.register_command("TEMP:ACTUAL", self._handle_temp_actual)
self.register_command("TEMP:STAB", self._handle_temp_stab)
def reset(self) -> None:
"""Reset chamber to default state."""
self._setpoint = 25.0
if self._physics_engine is not None:
self._physics_engine.set_chamber_setpoint(self._setpoint)
def _handle_temp_setpoint(self, command: SCPICommand) -> str:
"""Handle TEMP:SETPOINT command/query.
Args:
command: Parsed SCPI command.
Returns:
Setpoint value for query, empty string for set command.
Raises:
ValueError: If setpoint argument is invalid.
"""
if command.is_query:
return f"{self._setpoint:.2f}"
# Set command requires one argument
if not command.arguments:
raise ValueError("TEMP:SETPOINT requires a value")
try:
setpoint = float(command.arguments[0])
except ValueError:
raise ValueError(f"Invalid temperature value: {command.arguments[0]}")
self._setpoint = setpoint
if self._physics_engine is not None:
self._physics_engine.set_chamber_setpoint(setpoint)
return ""
def _handle_temp_actual(self, command: SCPICommand) -> str:
"""Handle TEMP:ACTUAL? query.
Args:
command: Parsed SCPI command.
Returns:
Actual chamber temperature.
Raises:
ValueError: If used as command (not query).
"""
if not command.is_query:
raise ValueError("TEMP:ACTUAL is query only")
if self._physics_engine is None:
# Return setpoint if no physics engine connected
return f"{self._setpoint:.2f}"
thermal_state = self._physics_engine.get_thermal_state()
return f"{thermal_state.chamber_temperature:.2f}"
def _handle_temp_stab(self, command: SCPICommand) -> str:
"""Handle TEMP:STAB? stability query.
Temperature is considered stable when the actual chamber temperature
is within STABILITY_THRESHOLD of the setpoint.
Args:
command: Parsed SCPI command.
Returns:
"1" if stable, "0" if settling.
Raises:
ValueError: If used as command (not query).
"""
if not command.is_query:
raise ValueError("TEMP:STAB is query only")
if self._physics_engine is None:
# Assume stable if no physics engine connected
return "1"
thermal_state = self._physics_engine.get_thermal_state()
error = abs(thermal_state.chamber_temperature - self._setpoint)
if error <= self.STABILITY_THRESHOLD:
return "1"
return "0"

View File

@@ -0,0 +1,213 @@
"""Virtual digital multimeter (DMM) simulator.
This module implements a SCPI-based virtual multimeter that interfaces
with the physics engine to measure DUT electrical parameters.
"""
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING
from py_dvt_ate.instruments.scpi import SCPICommand
from py_dvt_ate.simulation.virtual.base import BaseInstrument
if TYPE_CHECKING:
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
class MeasurementFunction(Enum):
"""Available measurement functions."""
VOLTAGE_DC = "VOLT:DC"
CURRENT_DC = "CURR:DC"
class MultimeterSim(BaseInstrument):
"""Virtual digital multimeter simulator.
Simulates a digital multimeter with SCPI control interface. The DMM
measures DUT output voltage and load current via the physics engine.
SCPI Commands:
MEAS:VOLT:DC? - Measure DC voltage (shortcut)
MEAS:CURR:DC? - Measure DC current (shortcut)
CONF:VOLT:DC - Configure for DC voltage measurement
CONF:CURR:DC - Configure for DC current measurement
CONF? - Query current configuration
READ? - Take measurement with current configuration
Attributes:
manufacturer: "PyDVTATE"
model: "DMM-SIM-001"
"""
manufacturer = "PyDVTATE"
model = "DMM-SIM-001"
serial_number = "DMMSIM001"
firmware_version = "1.0.0"
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
"""Initialise the multimeter simulator.
Args:
physics_engine: Reference to physics engine for measurement values.
"""
self._function = MeasurementFunction.VOLTAGE_DC
super().__init__(physics_engine)
def _setup_commands(self) -> None:
"""Register multimeter SCPI commands."""
self.register_command("MEAS:VOLT:DC", self._handle_meas_volt_dc)
self.register_command("MEAS:CURR:DC", self._handle_meas_curr_dc)
self.register_command("CONF:VOLT:DC", self._handle_conf_volt_dc)
self.register_command("CONF:CURR:DC", self._handle_conf_curr_dc)
self.register_command("CONF", self._handle_conf)
self.register_command("READ", self._handle_read)
def reset(self) -> None:
"""Reset multimeter to default state."""
self._function = MeasurementFunction.VOLTAGE_DC
def _handle_meas_volt_dc(self, command: SCPICommand) -> str:
"""Handle MEAS:VOLT:DC? query.
Configures for DC voltage and takes measurement in one command.
Args:
command: Parsed SCPI command.
Returns:
Measured DC voltage.
Raises:
ValueError: If used as command (not query).
"""
if not command.is_query:
raise ValueError("MEAS:VOLT:DC is query only")
self._function = MeasurementFunction.VOLTAGE_DC
return self._measure_voltage_dc()
def _handle_meas_curr_dc(self, command: SCPICommand) -> str:
"""Handle MEAS:CURR:DC? query.
Configures for DC current and takes measurement in one command.
Args:
command: Parsed SCPI command.
Returns:
Measured DC current.
Raises:
ValueError: If used as command (not query).
"""
if not command.is_query:
raise ValueError("MEAS:CURR:DC is query only")
self._function = MeasurementFunction.CURRENT_DC
return self._measure_current_dc()
def _handle_conf_volt_dc(self, command: SCPICommand) -> str:
"""Handle CONF:VOLT:DC command.
Configures multimeter for DC voltage measurement.
Args:
command: Parsed SCPI command.
Returns:
Empty string (no response for configuration).
Raises:
ValueError: If used as query.
"""
if command.is_query:
raise ValueError("CONF:VOLT:DC is command only")
self._function = MeasurementFunction.VOLTAGE_DC
return ""
def _handle_conf_curr_dc(self, command: SCPICommand) -> str:
"""Handle CONF:CURR:DC command.
Configures multimeter for DC current measurement.
Args:
command: Parsed SCPI command.
Returns:
Empty string (no response for configuration).
Raises:
ValueError: If used as query.
"""
if command.is_query:
raise ValueError("CONF:CURR:DC is command only")
self._function = MeasurementFunction.CURRENT_DC
return ""
def _handle_conf(self, command: SCPICommand) -> str:
"""Handle CONF? query.
Args:
command: Parsed SCPI command.
Returns:
Current measurement configuration.
Raises:
ValueError: If used as command without subcommand.
"""
if not command.is_query:
raise ValueError("CONF requires a function (e.g., CONF:VOLT:DC)")
return f'"{self._function.value}"'
def _handle_read(self, command: SCPICommand) -> str:
"""Handle READ? query.
Takes measurement using current configuration.
Args:
command: Parsed SCPI command.
Returns:
Measured value.
Raises:
ValueError: If used as command (not query).
"""
if not command.is_query:
raise ValueError("READ is query only")
if self._function == MeasurementFunction.VOLTAGE_DC:
return self._measure_voltage_dc()
else:
return self._measure_current_dc()
def _measure_voltage_dc(self) -> str:
"""Measure DC voltage from physics engine.
Returns:
Formatted voltage reading.
"""
if self._physics_engine is None:
return "0.000000"
electrical_state = self._physics_engine.get_electrical_state()
return f"{electrical_state.output_voltage:.6f}"
def _measure_current_dc(self) -> str:
"""Measure DC current from physics engine.
Returns:
Formatted current reading.
"""
if self._physics_engine is None:
return "0.000000"
electrical_state = self._physics_engine.get_electrical_state()
return f"{electrical_state.load_current:.6f}"

View File

@@ -0,0 +1,222 @@
"""Virtual power supply simulator.
This module implements a SCPI-based virtual power supply that interfaces
with the physics engine to provide realistic power supply simulation.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from py_dvt_ate.instruments.scpi import SCPICommand
from py_dvt_ate.simulation.virtual.base import BaseInstrument
if TYPE_CHECKING:
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
class PowerSupplySim(BaseInstrument):
"""Virtual power supply simulator.
Simulates a programmable DC power supply with SCPI control interface.
The power supply provides input voltage to the DUT via the physics engine.
SCPI Commands:
VOLT <value> - Set output voltage in volts
VOLT? - Query voltage setpoint
CURR <value> - Set current limit in amps
CURR? - Query current limit
OUTP <ON|OFF|1|0> - Enable/disable output
OUTP? - Query output state (1=on, 0=off)
MEAS:VOLT? - Measure actual output voltage
MEAS:CURR? - Measure actual output current
Attributes:
manufacturer: "PyDVTATE"
model: "PS-SIM-001"
"""
manufacturer = "PyDVTATE"
model = "PS-SIM-001"
serial_number = "PSSIM001"
firmware_version = "1.0.0"
# Default values
DEFAULT_VOLTAGE = 0.0
DEFAULT_CURRENT_LIMIT = 1.0
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
"""Initialise the power supply simulator.
Args:
physics_engine: Reference to physics engine for electrical state.
"""
self._voltage_setpoint = self.DEFAULT_VOLTAGE
self._current_limit = self.DEFAULT_CURRENT_LIMIT
self._output_enabled = False
super().__init__(physics_engine)
def _setup_commands(self) -> None:
"""Register power supply SCPI commands."""
self.register_command("VOLT", self._handle_volt)
self.register_command("CURR", self._handle_curr)
self.register_command("OUTP", self._handle_outp)
self.register_command("MEAS:VOLT", self._handle_meas_volt)
self.register_command("MEAS:CURR", self._handle_meas_curr)
def reset(self) -> None:
"""Reset power supply to default state."""
self._voltage_setpoint = self.DEFAULT_VOLTAGE
self._current_limit = self.DEFAULT_CURRENT_LIMIT
self._output_enabled = False
if self._physics_engine is not None:
self._physics_engine.set_input_voltage(0.0)
self._physics_engine.set_output_enabled(False)
def _handle_volt(self, command: SCPICommand) -> str:
"""Handle VOLT command/query.
Args:
command: Parsed SCPI command.
Returns:
Voltage setpoint for query, empty string for set command.
Raises:
ValueError: If voltage argument is invalid.
"""
if command.is_query:
return f"{self._voltage_setpoint:.3f}"
if not command.arguments:
raise ValueError("VOLT requires a value")
try:
voltage = float(command.arguments[0])
except ValueError:
raise ValueError(f"Invalid voltage value: {command.arguments[0]}")
if voltage < 0:
raise ValueError("Voltage cannot be negative")
self._voltage_setpoint = voltage
if self._physics_engine is not None and self._output_enabled:
self._physics_engine.set_input_voltage(voltage)
return ""
def _handle_curr(self, command: SCPICommand) -> str:
"""Handle CURR command/query.
Args:
command: Parsed SCPI command.
Returns:
Current limit for query, empty string for set command.
Raises:
ValueError: If current argument is invalid.
"""
if command.is_query:
return f"{self._current_limit:.3f}"
if not command.arguments:
raise ValueError("CURR requires a value")
try:
current = float(command.arguments[0])
except ValueError:
raise ValueError(f"Invalid current value: {command.arguments[0]}")
if current < 0:
raise ValueError("Current limit cannot be negative")
self._current_limit = current
return ""
def _handle_outp(self, command: SCPICommand) -> str:
"""Handle OUTP command/query.
Args:
command: Parsed SCPI command.
Returns:
"1" or "0" for query, empty string for set command.
Raises:
ValueError: If output argument is invalid.
"""
if command.is_query:
return "1" if self._output_enabled else "0"
if not command.arguments:
raise ValueError("OUTP requires a value (ON, OFF, 1, or 0)")
arg = command.arguments[0].upper()
if arg in ("ON", "1"):
self._output_enabled = True
elif arg in ("OFF", "0"):
self._output_enabled = False
else:
raise ValueError(f"Invalid output state: {command.arguments[0]}")
if self._physics_engine is not None:
self._physics_engine.set_output_enabled(self._output_enabled)
if self._output_enabled:
self._physics_engine.set_input_voltage(self._voltage_setpoint)
else:
self._physics_engine.set_input_voltage(0.0)
return ""
def _handle_meas_volt(self, command: SCPICommand) -> str:
"""Handle MEAS:VOLT? query.
Args:
command: Parsed SCPI command.
Returns:
Measured output voltage.
Raises:
ValueError: If used as command (not query).
"""
if not command.is_query:
raise ValueError("MEAS:VOLT is query only")
if not self._output_enabled:
return "0.000"
if self._physics_engine is None:
return f"{self._voltage_setpoint:.3f}"
electrical_state = self._physics_engine.get_electrical_state()
return f"{electrical_state.input_voltage:.3f}"
def _handle_meas_curr(self, command: SCPICommand) -> str:
"""Handle MEAS:CURR? query.
Args:
command: Parsed SCPI command.
Returns:
Measured output current.
Raises:
ValueError: If used as command (not query).
"""
if not command.is_query:
raise ValueError("MEAS:CURR is query only")
if not self._output_enabled:
return "0.000"
if self._physics_engine is None:
return "0.000"
electrical_state = self._physics_engine.get_electrical_state()
# Total current is load current + quiescent current
total_current = electrical_state.load_current + electrical_state.quiescent_current
return f"{total_current:.3f}"

View File

@@ -1 +1,8 @@
"""pytest fixtures for py_dvt_ate tests.""" """pytest fixtures for py_dvt_ate tests."""
import pytest
def pytest_configure(config: pytest.Config) -> None:
"""Configure pytest markers."""
config.addinivalue_line("markers", "asyncio: mark test as async")

View File

@@ -0,0 +1 @@
"""Integration tests for py_dvt_ate."""

View File

@@ -0,0 +1 @@
"""Configuration for integration tests."""

View File

@@ -0,0 +1,274 @@
"""Integration tests for TCP server.
Tests the InstrumentServer and SimulationServer with actual TCP connections.
"""
from __future__ import annotations
import asyncio
import pytest
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.simulation.tcp_server import InstrumentServer
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
@pytest.mark.asyncio(loop_scope="function")
class TestInstrumentServer:
"""Tests for InstrumentServer TCP functionality."""
@pytest.fixture
def physics_engine(self) -> PhysicsEngine:
"""Create a physics engine for testing."""
return PhysicsEngine(update_rate_hz=100.0)
@pytest.fixture
def server(self, physics_engine: PhysicsEngine) -> InstrumentServer:
"""Create an instrument server with a thermal chamber."""
server = InstrumentServer(host="127.0.0.1")
chamber = ThermalChamberSim(physics_engine)
server.register_instrument(15000, chamber)
return server
async def test_server_start_stop(self, server: InstrumentServer) -> None:
"""Test server can start and stop."""
assert not server.is_running
await server.start()
assert server.is_running
await server.stop()
assert not server.is_running
async def test_client_connection(self, server: InstrumentServer) -> None:
"""Test client can connect and send command."""
await server.start()
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
# Send *IDN? query
writer.write(b"*IDN?\n")
await writer.drain()
# Read response
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
assert b"PyDVTATE" in response
assert b"TC-SIM-001" in response
writer.close()
await writer.wait_closed()
finally:
await server.stop()
async def test_multiple_commands(self, server: InstrumentServer) -> None:
"""Test sending multiple commands in sequence."""
await server.start()
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
# Set temperature setpoint
writer.write(b"TEMP:SETPOINT 85.0\n")
await writer.drain()
# Query setpoint
writer.write(b"TEMP:SETPOINT?\n")
await writer.drain()
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
assert b"85.00" in response
# Query actual temperature
writer.write(b"TEMP:ACTUAL?\n")
await writer.drain()
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
# Should return a valid float
temp = float(response.decode().strip())
assert -50 <= temp <= 200
writer.close()
await writer.wait_closed()
finally:
await server.stop()
async def test_concurrent_connections(
self, physics_engine: PhysicsEngine
) -> None:
"""Test multiple concurrent client connections."""
server = InstrumentServer(host="127.0.0.1")
chamber = ThermalChamberSim(physics_engine)
server.register_instrument(15001, chamber)
await server.start()
try:
# Connect two clients simultaneously
reader1, writer1 = await asyncio.open_connection("127.0.0.1", 15001)
reader2, writer2 = await asyncio.open_connection("127.0.0.1", 15001)
# Send command from client 1
writer1.write(b"*IDN?\n")
await writer1.drain()
response1 = await asyncio.wait_for(reader1.readline(), timeout=2.0)
# Send command from client 2
writer2.write(b"*IDN?\n")
await writer2.drain()
response2 = await asyncio.wait_for(reader2.readline(), timeout=2.0)
# Both should get valid responses
assert b"TC-SIM-001" in response1
assert b"TC-SIM-001" in response2
writer1.close()
writer2.close()
await writer1.wait_closed()
await writer2.wait_closed()
finally:
await server.stop()
@pytest.mark.asyncio(loop_scope="function")
class TestSimulationServer:
"""Tests for complete SimulationServer."""
async def test_simulation_server_start_stop(self) -> None:
"""Test simulation server lifecycle."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16000,
psu_port=16001,
dmm_port=16002,
physics_rate_hz=100.0,
)
server = SimulationServer(config)
assert not server.is_running
await server.start()
assert server.is_running
assert server.physics_engine is not None
await server.stop()
assert not server.is_running
async def test_all_instruments_accessible(self) -> None:
"""Test all three instruments are accessible over TCP."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16100,
psu_port=16101,
dmm_port=16102,
)
server = SimulationServer(config)
await server.start()
try:
# Test thermal chamber
r, w = await asyncio.open_connection("127.0.0.1", 16100)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"TC-SIM-001" in resp
w.close()
await w.wait_closed()
# Test power supply
r, w = await asyncio.open_connection("127.0.0.1", 16101)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"PS-SIM-001" in resp
w.close()
await w.wait_closed()
# Test multimeter
r, w = await asyncio.open_connection("127.0.0.1", 16102)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"DMM-SIM-001" in resp
w.close()
await w.wait_closed()
finally:
await server.stop()
async def test_physics_engine_integration(self) -> None:
"""Test instruments share physics engine state."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16200,
psu_port=16201,
dmm_port=16202,
)
server = SimulationServer(config)
await server.start()
try:
# Connect to power supply and enable output
psu_r, psu_w = await asyncio.open_connection("127.0.0.1", 16201)
psu_w.write(b"VOLT 5.0\n")
await psu_w.drain()
psu_w.write(b"OUTP ON\n")
await psu_w.drain()
# Run a few physics steps
await asyncio.sleep(0.1)
# Query voltage from power supply
psu_w.write(b"MEAS:VOLT?\n")
await psu_w.drain()
psu_resp = await asyncio.wait_for(psu_r.readline(), timeout=2.0)
psu_voltage = float(psu_resp.decode().strip())
# Connect to DMM and measure DUT output
dmm_r, dmm_w = await asyncio.open_connection("127.0.0.1", 16202)
dmm_w.write(b"MEAS:VOLT:DC?\n")
await dmm_w.drain()
dmm_resp = await asyncio.wait_for(dmm_r.readline(), timeout=2.0)
dmm_voltage = float(dmm_resp.decode().strip())
# PSU should show input voltage (5V)
assert 4.9 <= psu_voltage <= 5.1
# DMM should show DUT output voltage (LDO regulated ~3.3V)
assert 3.0 <= dmm_voltage <= 3.5
psu_w.close()
dmm_w.close()
await psu_w.wait_closed()
await dmm_w.wait_closed()
finally:
await server.stop()
async def test_error_handling(self) -> None:
"""Test invalid commands return errors."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16300,
psu_port=16301,
dmm_port=16302,
)
server = SimulationServer(config)
await server.start()
try:
r, w = await asyncio.open_connection("127.0.0.1", 16300)
# Send invalid command
w.write(b"INVALID:COMMAND\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"ERROR" in resp
w.close()
await w.wait_closed()
finally:
await server.stop()

View File

@@ -0,0 +1,306 @@
"""Unit tests for multimeter simulator."""
import pytest
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
class TestMultimeterSimBasic:
"""Tests for MultimeterSim without physics engine."""
@pytest.fixture
def dmm(self) -> MultimeterSim:
"""Create multimeter instance without physics engine."""
return MultimeterSim()
def test_creation(self, dmm: MultimeterSim) -> None:
"""Test multimeter can be created."""
assert dmm is not None
assert dmm.model == "DMM-SIM-001"
assert dmm.manufacturer == "PyDVTATE"
def test_idn_query(self, dmm: MultimeterSim) -> None:
"""Test *IDN? returns identification string."""
response = dmm.process("*IDN?")
assert "PyDVTATE" in response
assert "DMM-SIM-001" in response
def test_rst_command(self, dmm: MultimeterSim) -> None:
"""Test *RST resets to defaults."""
# Set non-default config
dmm.process("CONF:CURR:DC")
assert dmm.process("CONF?") == '"CURR:DC"'
# Reset
response = dmm.process("*RST")
assert response == ""
# Check defaults restored
assert dmm.process("CONF?") == '"VOLT:DC"'
def test_opc_query(self, dmm: MultimeterSim) -> None:
"""Test *OPC? returns 1."""
response = dmm.process("*OPC?")
assert response == "1"
def test_unknown_command(self, dmm: MultimeterSim) -> None:
"""Test unknown command returns error."""
response = dmm.process("INVALID:CMD")
assert response.startswith("ERROR:")
assert "Unknown command" in response
class TestMultimeterMeasVoltDC:
"""Tests for MEAS:VOLT:DC command."""
@pytest.fixture
def dmm(self) -> MultimeterSim:
"""Create multimeter instance without physics engine."""
return MultimeterSim()
def test_meas_volt_dc_query_no_engine(self, dmm: MultimeterSim) -> None:
"""Test MEAS:VOLT:DC? returns 0 without physics engine."""
response = dmm.process("MEAS:VOLT:DC?")
assert response == "0.000000"
def test_meas_volt_dc_sets_config(self, dmm: MultimeterSim) -> None:
"""Test MEAS:VOLT:DC? sets configuration to VOLT:DC."""
dmm.process("CONF:CURR:DC")
dmm.process("MEAS:VOLT:DC?")
assert dmm.process("CONF?") == '"VOLT:DC"'
def test_meas_volt_dc_as_command_fails(self, dmm: MultimeterSim) -> None:
"""Test MEAS:VOLT:DC (without ?) returns error."""
response = dmm.process("MEAS:VOLT:DC 1.0")
assert response.startswith("ERROR:")
assert "query only" in response
class TestMultimeterMeasCurrDC:
"""Tests for MEAS:CURR:DC command."""
@pytest.fixture
def dmm(self) -> MultimeterSim:
"""Create multimeter instance without physics engine."""
return MultimeterSim()
def test_meas_curr_dc_query_no_engine(self, dmm: MultimeterSim) -> None:
"""Test MEAS:CURR:DC? returns 0 without physics engine."""
response = dmm.process("MEAS:CURR:DC?")
assert response == "0.000000"
def test_meas_curr_dc_sets_config(self, dmm: MultimeterSim) -> None:
"""Test MEAS:CURR:DC? sets configuration to CURR:DC."""
dmm.process("MEAS:CURR:DC?")
assert dmm.process("CONF?") == '"CURR:DC"'
def test_meas_curr_dc_as_command_fails(self, dmm: MultimeterSim) -> None:
"""Test MEAS:CURR:DC (without ?) returns error."""
response = dmm.process("MEAS:CURR:DC 0.1")
assert response.startswith("ERROR:")
assert "query only" in response
class TestMultimeterConf:
"""Tests for CONF commands."""
@pytest.fixture
def dmm(self) -> MultimeterSim:
"""Create multimeter instance without physics engine."""
return MultimeterSim()
def test_conf_query_default(self, dmm: MultimeterSim) -> None:
"""Test CONF? returns default configuration."""
response = dmm.process("CONF?")
assert response == '"VOLT:DC"'
def test_conf_volt_dc(self, dmm: MultimeterSim) -> None:
"""Test CONF:VOLT:DC sets voltage measurement mode."""
dmm.process("CONF:CURR:DC")
response = dmm.process("CONF:VOLT:DC")
assert response == ""
assert dmm.process("CONF?") == '"VOLT:DC"'
def test_conf_volt_dc_as_query_fails(self, dmm: MultimeterSim) -> None:
"""Test CONF:VOLT:DC? returns error."""
response = dmm.process("CONF:VOLT:DC?")
assert response.startswith("ERROR:")
assert "command only" in response
def test_conf_curr_dc(self, dmm: MultimeterSim) -> None:
"""Test CONF:CURR:DC sets current measurement mode."""
response = dmm.process("CONF:CURR:DC")
assert response == ""
assert dmm.process("CONF?") == '"CURR:DC"'
def test_conf_curr_dc_as_query_fails(self, dmm: MultimeterSim) -> None:
"""Test CONF:CURR:DC? returns error."""
response = dmm.process("CONF:CURR:DC?")
assert response.startswith("ERROR:")
assert "command only" in response
def test_conf_as_command_fails(self, dmm: MultimeterSim) -> None:
"""Test CONF without subcommand returns error."""
response = dmm.process("CONF")
assert response.startswith("ERROR:")
class TestMultimeterRead:
"""Tests for READ command."""
@pytest.fixture
def dmm(self) -> MultimeterSim:
"""Create multimeter instance without physics engine."""
return MultimeterSim()
def test_read_query_volt_mode(self, dmm: MultimeterSim) -> None:
"""Test READ? returns voltage when configured for voltage."""
dmm.process("CONF:VOLT:DC")
response = dmm.process("READ?")
assert response == "0.000000"
def test_read_query_curr_mode(self, dmm: MultimeterSim) -> None:
"""Test READ? returns current when configured for current."""
dmm.process("CONF:CURR:DC")
response = dmm.process("READ?")
assert response == "0.000000"
def test_read_as_command_fails(self, dmm: MultimeterSim) -> None:
"""Test READ (without ?) returns error."""
response = dmm.process("READ")
assert response.startswith("ERROR:")
assert "query only" in response
class TestMultimeterWithPhysicsEngine:
"""Tests for MultimeterSim with physics engine integration."""
@pytest.fixture
def engine(self) -> PhysicsEngine:
"""Create physics engine instance."""
return PhysicsEngine(update_rate_hz=100.0)
@pytest.fixture
def dmm(self, engine: PhysicsEngine) -> MultimeterSim:
"""Create multimeter instance with physics engine."""
return MultimeterSim(physics_engine=engine)
def test_meas_volt_dc_returns_engine_voltage(
self, dmm: MultimeterSim, engine: PhysicsEngine
) -> None:
"""Test MEAS:VOLT:DC? returns physics engine output voltage."""
engine.set_input_voltage(5.0)
engine.set_output_enabled(True)
engine.step()
response = dmm.process("MEAS:VOLT:DC?")
# LDO model outputs ~3.3V nominal
voltage = float(response)
assert voltage > 3.0
assert voltage < 4.0
def test_meas_volt_dc_returns_zero_when_disabled(
self, dmm: MultimeterSim, engine: PhysicsEngine
) -> None:
"""Test MEAS:VOLT:DC? returns 0 when DUT output disabled."""
engine.set_input_voltage(5.0)
engine.set_output_enabled(False)
engine.step()
response = dmm.process("MEAS:VOLT:DC?")
assert response == "0.000000"
def test_meas_curr_dc_returns_engine_current(
self, dmm: MultimeterSim, engine: PhysicsEngine
) -> None:
"""Test MEAS:CURR:DC? returns physics engine load current."""
engine.set_input_voltage(5.0)
engine.set_load_current(0.1)
engine.set_output_enabled(True)
engine.step()
response = dmm.process("MEAS:CURR:DC?")
assert float(response) == pytest.approx(0.1, abs=0.001)
def test_meas_curr_dc_returns_zero_when_disabled(
self, dmm: MultimeterSim, engine: PhysicsEngine
) -> None:
"""Test MEAS:CURR:DC? returns 0 when DUT output disabled."""
engine.set_input_voltage(5.0)
engine.set_load_current(0.1)
engine.set_output_enabled(False)
engine.step()
response = dmm.process("MEAS:CURR:DC?")
assert response == "0.000000"
def test_read_uses_configured_function(
self, dmm: MultimeterSim, engine: PhysicsEngine
) -> None:
"""Test READ? respects configured measurement function."""
engine.set_input_voltage(5.0)
engine.set_load_current(0.1)
engine.set_output_enabled(True)
engine.step()
# Configure for current
dmm.process("CONF:CURR:DC")
response = dmm.process("READ?")
# Should return current, not voltage
assert float(response) == pytest.approx(0.1, abs=0.001)
def test_reset_restores_voltage_mode(
self, dmm: MultimeterSim, engine: PhysicsEngine
) -> None:
"""Test *RST restores default voltage measurement mode."""
dmm.process("CONF:CURR:DC")
dmm.process("*RST")
assert dmm.process("CONF?") == '"VOLT:DC"'
def test_voltage_changes_with_temperature(
self, dmm: MultimeterSim, engine: PhysicsEngine
) -> None:
"""Test measured voltage changes with DUT temperature."""
engine.set_input_voltage(5.0)
engine.set_output_enabled(True)
engine.step()
# Measure at initial temperature
response1 = dmm.process("MEAS:VOLT:DC?")
v1 = float(response1)
# Change chamber temperature and let settle
engine.set_chamber_setpoint(85.0)
for _ in range(5000): # Let temperature settle somewhat
engine.step()
# Measure at elevated temperature
response2 = dmm.process("MEAS:VOLT:DC?")
v2 = float(response2)
# Output voltage should have changed (LDO has tempco)
assert v1 != v2

View File

@@ -0,0 +1,352 @@
"""Unit tests for power supply simulator."""
import pytest
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
class TestPowerSupplySimBasic:
"""Tests for PowerSupplySim without physics engine."""
@pytest.fixture
def psu(self) -> PowerSupplySim:
"""Create power supply instance without physics engine."""
return PowerSupplySim()
def test_creation(self, psu: PowerSupplySim) -> None:
"""Test power supply can be created."""
assert psu is not None
assert psu.model == "PS-SIM-001"
assert psu.manufacturer == "PyDVTATE"
def test_idn_query(self, psu: PowerSupplySim) -> None:
"""Test *IDN? returns identification string."""
response = psu.process("*IDN?")
assert "PyDVTATE" in response
assert "PS-SIM-001" in response
def test_rst_command(self, psu: PowerSupplySim) -> None:
"""Test *RST resets to defaults."""
# Set non-default values
psu.process("VOLT 12.0")
psu.process("CURR 2.0")
psu.process("OUTP ON")
# Reset
response = psu.process("*RST")
assert response == ""
# Check defaults restored
assert psu.process("VOLT?") == "0.000"
assert psu.process("CURR?") == "1.000"
assert psu.process("OUTP?") == "0"
def test_opc_query(self, psu: PowerSupplySim) -> None:
"""Test *OPC? returns 1."""
response = psu.process("*OPC?")
assert response == "1"
def test_unknown_command(self, psu: PowerSupplySim) -> None:
"""Test unknown command returns error."""
response = psu.process("INVALID:CMD")
assert response.startswith("ERROR:")
assert "Unknown command" in response
class TestPowerSupplyVoltage:
"""Tests for VOLT command."""
@pytest.fixture
def psu(self) -> PowerSupplySim:
"""Create power supply instance without physics engine."""
return PowerSupplySim()
def test_volt_query_default(self, psu: PowerSupplySim) -> None:
"""Test VOLT? returns default value."""
response = psu.process("VOLT?")
assert response == "0.000"
def test_volt_set(self, psu: PowerSupplySim) -> None:
"""Test VOLT sets value."""
response = psu.process("VOLT 12.5")
assert response == ""
assert psu.process("VOLT?") == "12.500"
def test_volt_set_decimal(self, psu: PowerSupplySim) -> None:
"""Test VOLT accepts decimal values."""
psu.process("VOLT 3.3")
assert psu.process("VOLT?") == "3.300"
def test_volt_set_negative_fails(self, psu: PowerSupplySim) -> None:
"""Test VOLT rejects negative values."""
response = psu.process("VOLT -5.0")
assert response.startswith("ERROR:")
assert "negative" in response
def test_volt_set_invalid_value(self, psu: PowerSupplySim) -> None:
"""Test VOLT with invalid value returns error."""
response = psu.process("VOLT abc")
assert response.startswith("ERROR:")
assert "Invalid voltage" in response
def test_volt_set_no_argument(self, psu: PowerSupplySim) -> None:
"""Test VOLT without argument returns error."""
response = psu.process("VOLT")
assert response.startswith("ERROR:")
assert "requires a value" in response
class TestPowerSupplyCurrent:
"""Tests for CURR command."""
@pytest.fixture
def psu(self) -> PowerSupplySim:
"""Create power supply instance without physics engine."""
return PowerSupplySim()
def test_curr_query_default(self, psu: PowerSupplySim) -> None:
"""Test CURR? returns default value."""
response = psu.process("CURR?")
assert response == "1.000"
def test_curr_set(self, psu: PowerSupplySim) -> None:
"""Test CURR sets value."""
response = psu.process("CURR 0.5")
assert response == ""
assert psu.process("CURR?") == "0.500"
def test_curr_set_negative_fails(self, psu: PowerSupplySim) -> None:
"""Test CURR rejects negative values."""
response = psu.process("CURR -1.0")
assert response.startswith("ERROR:")
assert "negative" in response
def test_curr_set_invalid_value(self, psu: PowerSupplySim) -> None:
"""Test CURR with invalid value returns error."""
response = psu.process("CURR xyz")
assert response.startswith("ERROR:")
assert "Invalid current" in response
def test_curr_set_no_argument(self, psu: PowerSupplySim) -> None:
"""Test CURR without argument returns error."""
response = psu.process("CURR")
assert response.startswith("ERROR:")
assert "requires a value" in response
class TestPowerSupplyOutput:
"""Tests for OUTP command."""
@pytest.fixture
def psu(self) -> PowerSupplySim:
"""Create power supply instance without physics engine."""
return PowerSupplySim()
def test_outp_query_default(self, psu: PowerSupplySim) -> None:
"""Test OUTP? returns default value (off)."""
response = psu.process("OUTP?")
assert response == "0"
def test_outp_set_on(self, psu: PowerSupplySim) -> None:
"""Test OUTP ON enables output."""
response = psu.process("OUTP ON")
assert response == ""
assert psu.process("OUTP?") == "1"
def test_outp_set_1(self, psu: PowerSupplySim) -> None:
"""Test OUTP 1 enables output."""
psu.process("OUTP 1")
assert psu.process("OUTP?") == "1"
def test_outp_set_off(self, psu: PowerSupplySim) -> None:
"""Test OUTP OFF disables output."""
psu.process("OUTP ON")
psu.process("OUTP OFF")
assert psu.process("OUTP?") == "0"
def test_outp_set_0(self, psu: PowerSupplySim) -> None:
"""Test OUTP 0 disables output."""
psu.process("OUTP ON")
psu.process("OUTP 0")
assert psu.process("OUTP?") == "0"
def test_outp_set_invalid(self, psu: PowerSupplySim) -> None:
"""Test OUTP with invalid value returns error."""
response = psu.process("OUTP MAYBE")
assert response.startswith("ERROR:")
assert "Invalid output state" in response
def test_outp_set_no_argument(self, psu: PowerSupplySim) -> None:
"""Test OUTP without argument returns error."""
response = psu.process("OUTP")
assert response.startswith("ERROR:")
assert "requires a value" in response
class TestPowerSupplyMeasurement:
"""Tests for MEAS commands."""
@pytest.fixture
def psu(self) -> PowerSupplySim:
"""Create power supply instance without physics engine."""
return PowerSupplySim()
def test_meas_volt_when_off(self, psu: PowerSupplySim) -> None:
"""Test MEAS:VOLT? returns 0 when output is off."""
psu.process("VOLT 12.0")
response = psu.process("MEAS:VOLT?")
assert response == "0.000"
def test_meas_volt_when_on_no_engine(self, psu: PowerSupplySim) -> None:
"""Test MEAS:VOLT? returns setpoint when on without engine."""
psu.process("VOLT 12.0")
psu.process("OUTP ON")
response = psu.process("MEAS:VOLT?")
assert response == "12.000"
def test_meas_volt_as_command_fails(self, psu: PowerSupplySim) -> None:
"""Test MEAS:VOLT (without ?) returns error."""
response = psu.process("MEAS:VOLT 5.0")
assert response.startswith("ERROR:")
assert "query only" in response
def test_meas_curr_when_off(self, psu: PowerSupplySim) -> None:
"""Test MEAS:CURR? returns 0 when output is off."""
response = psu.process("MEAS:CURR?")
assert response == "0.000"
def test_meas_curr_when_on_no_engine(self, psu: PowerSupplySim) -> None:
"""Test MEAS:CURR? returns 0 when on without engine."""
psu.process("OUTP ON")
response = psu.process("MEAS:CURR?")
assert response == "0.000"
def test_meas_curr_as_command_fails(self, psu: PowerSupplySim) -> None:
"""Test MEAS:CURR (without ?) returns error."""
response = psu.process("MEAS:CURR 0.1")
assert response.startswith("ERROR:")
assert "query only" in response
class TestPowerSupplyWithPhysicsEngine:
"""Tests for PowerSupplySim with physics engine integration."""
@pytest.fixture
def engine(self) -> PhysicsEngine:
"""Create physics engine instance."""
return PhysicsEngine(update_rate_hz=100.0)
@pytest.fixture
def psu(self, engine: PhysicsEngine) -> PowerSupplySim:
"""Create power supply instance with physics engine."""
return PowerSupplySim(physics_engine=engine)
def test_outp_on_enables_engine_output(
self, psu: PowerSupplySim, engine: PhysicsEngine
) -> None:
"""Test OUTP ON enables physics engine output."""
psu.process("VOLT 5.0")
psu.process("OUTP ON")
assert engine.is_output_enabled is True
def test_outp_off_disables_engine_output(
self, psu: PowerSupplySim, engine: PhysicsEngine
) -> None:
"""Test OUTP OFF disables physics engine output."""
psu.process("OUTP ON")
psu.process("OUTP OFF")
assert engine.is_output_enabled is False
def test_volt_updates_engine_when_on(
self, psu: PowerSupplySim, engine: PhysicsEngine
) -> None:
"""Test VOLT updates engine input voltage when output is on."""
psu.process("OUTP ON")
psu.process("VOLT 5.0")
electrical = engine.get_electrical_state()
assert electrical.input_voltage == pytest.approx(5.0)
def test_volt_does_not_update_engine_when_off(
self, psu: PowerSupplySim, engine: PhysicsEngine
) -> None:
"""Test VOLT does not update engine when output is off."""
psu.process("VOLT 5.0")
electrical = engine.get_electrical_state()
assert electrical.input_voltage == pytest.approx(0.0)
def test_meas_volt_returns_engine_voltage(
self, psu: PowerSupplySim, engine: PhysicsEngine
) -> None:
"""Test MEAS:VOLT? returns physics engine voltage."""
psu.process("VOLT 5.0")
psu.process("OUTP ON")
response = psu.process("MEAS:VOLT?")
assert response == "5.000"
def test_meas_curr_returns_engine_current(
self, psu: PowerSupplySim, engine: PhysicsEngine
) -> None:
"""Test MEAS:CURR? returns total current from engine."""
psu.process("VOLT 5.0")
psu.process("OUTP ON")
engine.set_load_current(0.1)
# Step engine to allow calculations
engine.step()
response = psu.process("MEAS:CURR?")
# Should include load current + quiescent current
assert float(response) > 0.0
def test_reset_disables_engine_output(
self, psu: PowerSupplySim, engine: PhysicsEngine
) -> None:
"""Test *RST disables physics engine output."""
psu.process("VOLT 5.0")
psu.process("OUTP ON")
psu.process("*RST")
assert engine.is_output_enabled is False
def test_reset_sets_engine_voltage_zero(
self, psu: PowerSupplySim, engine: PhysicsEngine
) -> None:
"""Test *RST sets physics engine voltage to zero."""
psu.process("VOLT 5.0")
psu.process("OUTP ON")
psu.process("*RST")
electrical = engine.get_electrical_state()
assert electrical.input_voltage == pytest.approx(0.0)

View File

@@ -0,0 +1,203 @@
"""Unit tests for SCPI command parsing."""
import pytest
from py_dvt_ate.instruments.scpi import SCPICommand, SCPIParser
class TestSCPICommand:
"""Tests for the SCPICommand dataclass."""
def test_creation(self) -> None:
"""Test SCPICommand can be created with valid values."""
cmd = SCPICommand(
header="VOLT",
arguments=["3.3"],
is_query=False,
)
assert cmd.header == "VOLT"
assert cmd.arguments == ["3.3"]
assert cmd.is_query is False
def test_keyword_for_command(self) -> None:
"""Test keyword property for regular command."""
cmd = SCPICommand(header="VOLT", arguments=["3.3"], is_query=False)
assert cmd.keyword == "VOLT"
def test_keyword_for_query(self) -> None:
"""Test keyword property strips '?' from query."""
cmd = SCPICommand(header="VOLT?", arguments=[], is_query=True)
assert cmd.keyword == "VOLT"
def test_keyword_for_nested_command(self) -> None:
"""Test keyword property for nested SCPI command."""
cmd = SCPICommand(header="TEMP:SETPOINT?", arguments=[], is_query=True)
assert cmd.keyword == "TEMP:SETPOINT"
class TestSCPIParser:
"""Tests for the SCPIParser class."""
@pytest.fixture
def parser(self) -> SCPIParser:
"""Create parser instance for tests."""
return SCPIParser()
def test_parse_simple_query(self, parser: SCPIParser) -> None:
"""Test parsing simple query command."""
cmd = parser.parse("*IDN?")
assert cmd.header == "*IDN?"
assert cmd.arguments == []
assert cmd.is_query is True
assert cmd.keyword == "*IDN"
def test_parse_simple_command(self, parser: SCPIParser) -> None:
"""Test parsing simple command without arguments."""
cmd = parser.parse("*RST")
assert cmd.header == "*RST"
assert cmd.arguments == []
assert cmd.is_query is False
assert cmd.keyword == "*RST"
def test_parse_command_with_single_argument(self, parser: SCPIParser) -> None:
"""Test parsing command with single numeric argument."""
cmd = parser.parse("VOLT 3.3")
assert cmd.header == "VOLT"
assert cmd.arguments == ["3.3"]
assert cmd.is_query is False
def test_parse_command_with_multiple_arguments(self, parser: SCPIParser) -> None:
"""Test parsing command with comma-separated arguments."""
cmd = parser.parse("CONF:VOLT:DC 10,0.001")
assert cmd.header == "CONF:VOLT:DC"
assert cmd.arguments == ["10", "0.001"]
assert cmd.is_query is False
def test_parse_nested_scpi_command(self, parser: SCPIParser) -> None:
"""Test parsing nested SCPI command hierarchy."""
cmd = parser.parse("TEMP:SETPOINT 85.0")
assert cmd.header == "TEMP:SETPOINT"
assert cmd.arguments == ["85.0"]
assert cmd.is_query is False
assert cmd.keyword == "TEMP:SETPOINT"
def test_parse_nested_scpi_query(self, parser: SCPIParser) -> None:
"""Test parsing nested SCPI query."""
cmd = parser.parse("TEMP:SETPOINT?")
assert cmd.header == "TEMP:SETPOINT?"
assert cmd.arguments == []
assert cmd.is_query is True
def test_parse_ieee_common_commands(self, parser: SCPIParser) -> None:
"""Test parsing IEEE 488.2 common commands."""
# Identity query
cmd = parser.parse("*IDN?")
assert cmd.is_query is True
assert cmd.keyword == "*IDN"
# Reset
cmd = parser.parse("*RST")
assert cmd.is_query is False
assert cmd.keyword == "*RST"
# Clear status
cmd = parser.parse("*CLS")
assert cmd.is_query is False
assert cmd.keyword == "*CLS"
# Operation complete query
cmd = parser.parse("*OPC?")
assert cmd.is_query is True
assert cmd.keyword == "*OPC"
def test_parse_strips_whitespace(self, parser: SCPIParser) -> None:
"""Test parser strips leading and trailing whitespace."""
cmd = parser.parse(" VOLT 3.3 ")
assert cmd.header == "VOLT"
assert cmd.arguments == ["3.3"]
def test_parse_strips_argument_whitespace(self, parser: SCPIParser) -> None:
"""Test parser strips whitespace from arguments."""
cmd = parser.parse("CONF:VOLT:DC 10 , 0.001 ")
assert cmd.arguments == ["10", "0.001"]
def test_parse_empty_string(self, parser: SCPIParser) -> None:
"""Test parsing empty string returns empty command."""
cmd = parser.parse("")
assert cmd.header == ""
assert cmd.arguments == []
assert cmd.is_query is False
def test_parse_whitespace_only(self, parser: SCPIParser) -> None:
"""Test parsing whitespace-only string returns empty command."""
cmd = parser.parse(" ")
assert cmd.header == ""
assert cmd.arguments == []
assert cmd.is_query is False
def test_parse_output_on_off(self, parser: SCPIParser) -> None:
"""Test parsing output enable/disable commands."""
cmd_on = parser.parse("OUTP ON")
assert cmd_on.arguments == ["ON"]
cmd_off = parser.parse("OUTP OFF")
assert cmd_off.arguments == ["OFF"]
cmd_1 = parser.parse("OUTP 1")
assert cmd_1.arguments == ["1"]
cmd_0 = parser.parse("OUTP 0")
assert cmd_0.arguments == ["0"]
def test_parse_channel_select(self, parser: SCPIParser) -> None:
"""Test parsing channel selection commands."""
cmd = parser.parse("INST:SEL CH1")
assert cmd.header == "INST:SEL"
assert cmd.arguments == ["CH1"]
def test_parse_measurement_query(self, parser: SCPIParser) -> None:
"""Test parsing measurement query commands."""
cmd = parser.parse("MEAS:VOLT:DC?")
assert cmd.header == "MEAS:VOLT:DC?"
assert cmd.is_query is True
assert cmd.keyword == "MEAS:VOLT:DC"
def test_parse_measurement_with_range(self, parser: SCPIParser) -> None:
"""Test parsing measurement query with range argument."""
cmd = parser.parse("MEAS:VOLT:DC? AUTO")
assert cmd.header == "MEAS:VOLT:DC?"
assert cmd.arguments == ["AUTO"]
assert cmd.is_query is True
def test_parse_system_error_query(self, parser: SCPIParser) -> None:
"""Test parsing system error query."""
cmd = parser.parse("SYST:ERR?")
assert cmd.header == "SYST:ERR?"
assert cmd.is_query is True
assert cmd.keyword == "SYST:ERR"
def test_parse_nplc_setting(self, parser: SCPIParser) -> None:
"""Test parsing NPLC (integration time) command."""
cmd = parser.parse("SENS:VOLT:DC:NPLC 10")
assert cmd.header == "SENS:VOLT:DC:NPLC"
assert cmd.arguments == ["10"]
assert cmd.is_query is False

View File

@@ -0,0 +1,215 @@
"""Unit tests for thermal chamber simulator."""
import pytest
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
class TestThermalChamberSimBasic:
"""Tests for ThermalChamberSim without physics engine."""
@pytest.fixture
def chamber(self) -> ThermalChamberSim:
"""Create chamber instance without physics engine."""
return ThermalChamberSim()
def test_creation(self, chamber: ThermalChamberSim) -> None:
"""Test chamber can be created."""
assert chamber is not None
assert chamber.model == "TC-SIM-001"
assert chamber.manufacturer == "PyDVTATE"
def test_idn_query(self, chamber: ThermalChamberSim) -> None:
"""Test *IDN? returns identification string."""
response = chamber.process("*IDN?")
assert "PyDVTATE" in response
assert "TC-SIM-001" in response
def test_rst_command(self, chamber: ThermalChamberSim) -> None:
"""Test *RST resets to defaults."""
# Set non-default value
chamber.process("TEMP:SETPOINT 85.0")
assert chamber.process("TEMP:SETPOINT?") == "85.00"
# Reset
response = chamber.process("*RST")
assert response == ""
assert chamber.process("TEMP:SETPOINT?") == "25.00"
def test_opc_query(self, chamber: ThermalChamberSim) -> None:
"""Test *OPC? returns 1."""
response = chamber.process("*OPC?")
assert response == "1"
def test_unknown_command(self, chamber: ThermalChamberSim) -> None:
"""Test unknown command returns error."""
response = chamber.process("INVALID:CMD")
assert response.startswith("ERROR:")
assert "Unknown command" in response
class TestThermalChamberSetpoint:
"""Tests for TEMP:SETPOINT command."""
@pytest.fixture
def chamber(self) -> ThermalChamberSim:
"""Create chamber instance without physics engine."""
return ThermalChamberSim()
def test_setpoint_query_default(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:SETPOINT? returns default value."""
response = chamber.process("TEMP:SETPOINT?")
assert response == "25.00"
def test_setpoint_set(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:SETPOINT sets value."""
response = chamber.process("TEMP:SETPOINT 85.0")
assert response == ""
assert chamber.process("TEMP:SETPOINT?") == "85.00"
def test_setpoint_set_negative(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:SETPOINT accepts negative values."""
chamber.process("TEMP:SETPOINT -40.0")
assert chamber.process("TEMP:SETPOINT?") == "-40.00"
def test_setpoint_set_invalid_value(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:SETPOINT with invalid value returns error."""
response = chamber.process("TEMP:SETPOINT abc")
assert response.startswith("ERROR:")
assert "Invalid temperature" in response
def test_setpoint_set_no_argument(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:SETPOINT without argument returns error."""
response = chamber.process("TEMP:SETPOINT")
assert response.startswith("ERROR:")
assert "requires a value" in response
class TestThermalChamberActual:
"""Tests for TEMP:ACTUAL? query."""
@pytest.fixture
def chamber(self) -> ThermalChamberSim:
"""Create chamber instance without physics engine."""
return ThermalChamberSim()
def test_actual_query_without_engine(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:ACTUAL? returns setpoint when no physics engine."""
chamber.process("TEMP:SETPOINT 50.0")
response = chamber.process("TEMP:ACTUAL?")
# Without physics engine, returns setpoint
assert response == "50.00"
def test_actual_as_command_fails(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:ACTUAL (without ?) returns error."""
response = chamber.process("TEMP:ACTUAL 25.0")
assert response.startswith("ERROR:")
assert "query only" in response
class TestThermalChamberStability:
"""Tests for TEMP:STAB? query."""
@pytest.fixture
def chamber(self) -> ThermalChamberSim:
"""Create chamber instance without physics engine."""
return ThermalChamberSim()
def test_stab_query_without_engine(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:STAB? returns 1 when no physics engine."""
response = chamber.process("TEMP:STAB?")
# Without physics engine, assume stable
assert response == "1"
def test_stab_as_command_fails(self, chamber: ThermalChamberSim) -> None:
"""Test TEMP:STAB (without ?) returns error."""
response = chamber.process("TEMP:STAB 1")
assert response.startswith("ERROR:")
assert "query only" in response
class TestThermalChamberWithPhysicsEngine:
"""Tests for ThermalChamberSim with physics engine integration."""
@pytest.fixture
def engine(self) -> PhysicsEngine:
"""Create physics engine instance."""
return PhysicsEngine(update_rate_hz=100.0)
@pytest.fixture
def chamber(self, engine: PhysicsEngine) -> ThermalChamberSim:
"""Create chamber instance with physics engine."""
return ThermalChamberSim(physics_engine=engine)
def test_setpoint_updates_engine(
self, chamber: ThermalChamberSim, engine: PhysicsEngine
) -> None:
"""Test TEMP:SETPOINT updates physics engine."""
chamber.process("TEMP:SETPOINT 85.0")
# Step the engine and check thermal state
thermal = engine.get_thermal_state()
# Initial chamber temp is 25, will start moving towards 85
assert thermal.chamber_temperature == pytest.approx(25.0, abs=0.1)
# After many steps, should approach setpoint
for _ in range(10000): # 100 seconds at 100Hz
engine.step()
thermal = engine.get_thermal_state()
# Should be closer to setpoint (but not quite there due to time constant)
assert thermal.chamber_temperature > 80.0
def test_actual_query_returns_engine_temperature(
self, chamber: ThermalChamberSim, engine: PhysicsEngine
) -> None:
"""Test TEMP:ACTUAL? returns physics engine temperature."""
response = chamber.process("TEMP:ACTUAL?")
# Should match initial chamber temperature
assert response == "25.00"
def test_stability_when_at_setpoint(
self, chamber: ThermalChamberSim, engine: PhysicsEngine
) -> None:
"""Test TEMP:STAB? returns 1 when at setpoint."""
# Default setpoint is 25, engine starts at 25
response = chamber.process("TEMP:STAB?")
assert response == "1"
def test_stability_when_settling(
self, chamber: ThermalChamberSim, engine: PhysicsEngine
) -> None:
"""Test TEMP:STAB? returns 0 when settling."""
# Set new setpoint far from current temperature
chamber.process("TEMP:SETPOINT 85.0")
# Step once to ensure engine updates
engine.step()
# Should not be stable yet
response = chamber.process("TEMP:STAB?")
assert response == "0"
def test_reset_updates_engine(
self, chamber: ThermalChamberSim, engine: PhysicsEngine
) -> None:
"""Test *RST resets both chamber and engine setpoint."""
chamber.process("TEMP:SETPOINT 85.0")
chamber.process("*RST")
# Check setpoint is back to default
assert chamber.process("TEMP:SETPOINT?") == "25.00"