Compare commits
20 Commits
v0.1.0-alp
...
v0.1.0-alp
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e9c0ae0e5 | |||
| a742d57a6f | |||
| 2d358062f4 | |||
| 1a489b9106 | |||
| f9e59da32b | |||
| a4c01c856d | |||
| 144e80f87a | |||
| e811b21082 | |||
| 9a88a35cc5 | |||
| b31324a42a | |||
| 008134844d | |||
| ae85948539 | |||
| bccb8cc420 | |||
| 510e1ba683 | |||
| 5e69085875 | |||
| 5053399851 | |||
| d54ada18b2 | |||
| 252c329562 | |||
| 6e7da7f382 | |||
| 75e0a1cc25 |
19
CHANGELOG.md
19
CHANGELOG.md
@@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.1.0-alpha.3] - 2025-12-02
|
||||
|
||||
### Added
|
||||
- Async TCP server for exposing virtual instruments over network
|
||||
- InstrumentServer class with multi-port, multi-client support
|
||||
- Line-based SCPI protocol (newline-terminated commands/responses)
|
||||
- SimulationServer wiring physics engine to all virtual instruments
|
||||
- CLI `serve` command to start simulation server with configurable ports
|
||||
- Integration tests for TCP server and instrument connectivity
|
||||
|
||||
### Infrastructure
|
||||
- SCPI foundation (Sprint 5): command parser with IEEE 488.2 support
|
||||
- Virtual instrument base class with command dispatch
|
||||
- Thermal chamber simulator (TEMP:SETPOINT, TEMP:ACTUAL?, TEMP:STAB?)
|
||||
- Power supply simulator (VOLT, CURR, OUTP, MEAS commands)
|
||||
- Multimeter simulator (MEAS:VOLT:DC?, MEAS:CURR:DC?, CONF, READ?)
|
||||
|
||||
## [0.1.0-alpha.2] - 2025-12-02
|
||||
|
||||
### Added
|
||||
@@ -59,7 +76,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
| 0.1.0 | TBD | MVP Complete |
|
||||
| 0.1.0-beta.2 | TBD | First DVT test runs |
|
||||
| 0.1.0-beta.1 | TBD | HAL complete |
|
||||
| 0.1.0-alpha.3 | TBD | Network ready |
|
||||
| 0.1.0-alpha.3 | 2025-12-02 | Network ready |
|
||||
| 0.1.0-alpha.2 | 2025-12-02 | Visual demo |
|
||||
| 0.1.0-alpha.1 | 2025-12-02 | Physics engine |
|
||||
| 0.0.1 | 2025-12-01 | Project scaffolding |
|
||||
|
||||
@@ -86,5 +86,8 @@ ignore_missing_imports = true
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
asyncio_mode = "auto"
|
||||
addopts = "-v --tb=short"
|
||||
|
||||
[tool.pytest-asyncio]
|
||||
mode = "auto"
|
||||
default_fixture_loop_scope = "function"
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""py_dvt_ate: Coupled Physics DVT Simulation Platform."""
|
||||
|
||||
__version__ = "0.1.0-alpha.2"
|
||||
__version__ = "0.1.0-alpha.3"
|
||||
|
||||
@@ -36,5 +36,52 @@ def main(
|
||||
"""py-dvt-ate: Coupled Physics DVT Simulation Platform."""
|
||||
|
||||
|
||||
@app.command()
|
||||
def serve(
|
||||
host: Annotated[
|
||||
str,
|
||||
typer.Option("--host", "-h", help="Host address to bind to."),
|
||||
] = "127.0.0.1",
|
||||
chamber_port: Annotated[
|
||||
int,
|
||||
typer.Option("--chamber-port", help="Port for thermal chamber instrument."),
|
||||
] = 5000,
|
||||
psu_port: Annotated[
|
||||
int,
|
||||
typer.Option("--psu-port", help="Port for power supply instrument."),
|
||||
] = 5001,
|
||||
dmm_port: Annotated[
|
||||
int,
|
||||
typer.Option("--dmm-port", help="Port for multimeter instrument."),
|
||||
] = 5002,
|
||||
physics_rate: Annotated[
|
||||
float,
|
||||
typer.Option("--physics-rate", help="Physics engine update rate in Hz."),
|
||||
] = 100.0,
|
||||
) -> None:
|
||||
"""Start the simulation server with virtual instruments.
|
||||
|
||||
Runs a TCP server hosting virtual SCPI instruments connected to a
|
||||
shared physics engine. Each instrument listens on its own port.
|
||||
"""
|
||||
from py_dvt_ate.simulation.server import main as run_server
|
||||
|
||||
typer.echo(f"Starting simulation server on {host}...")
|
||||
typer.echo(f" Thermal chamber: port {chamber_port}")
|
||||
typer.echo(f" Power supply: port {psu_port}")
|
||||
typer.echo(f" Multimeter: port {dmm_port}")
|
||||
typer.echo(f" Physics rate: {physics_rate} Hz")
|
||||
typer.echo("")
|
||||
typer.echo("Press Ctrl+C to stop.")
|
||||
|
||||
run_server(
|
||||
host=host,
|
||||
chamber_port=chamber_port,
|
||||
psu_port=psu_port,
|
||||
dmm_port=dmm_port,
|
||||
physics_rate=physics_rate,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
|
||||
@@ -4,6 +4,7 @@ This module provides an interactive dashboard for visualising the physics
|
||||
engine directly, demonstrating thermal-electrical coupling in real-time.
|
||||
"""
|
||||
|
||||
import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
@@ -44,12 +45,29 @@ def init_session_state() -> None:
|
||||
st.session_state.history = SimulationHistory()
|
||||
if "running" not in st.session_state:
|
||||
st.session_state.running = False
|
||||
if "last_update" not in st.session_state:
|
||||
st.session_state.last_update = time.time()
|
||||
# Note: time_multiplier, temp_setpoint, input_voltage, output_enabled,
|
||||
# load_current are managed by their respective widgets via keys
|
||||
|
||||
|
||||
def step_simulation(steps: int = 10) -> None:
|
||||
"""Advance the simulation by the given number of steps."""
|
||||
def step_simulation() -> None:
|
||||
"""Advance the simulation based on elapsed real time and multiplier."""
|
||||
engine: PhysicsEngine = st.session_state.engine
|
||||
history: SimulationHistory = st.session_state.history
|
||||
multiplier: float = st.session_state.get("time_multiplier", 10)
|
||||
|
||||
# Calculate how much simulation time to advance
|
||||
current_time = time.time()
|
||||
elapsed_real = current_time - st.session_state.last_update
|
||||
st.session_state.last_update = current_time
|
||||
|
||||
# Simulation time to advance (capped to prevent huge jumps)
|
||||
sim_time_to_advance = min(elapsed_real * multiplier, 2.0)
|
||||
|
||||
# Calculate number of steps needed
|
||||
steps = int(sim_time_to_advance / engine.dt)
|
||||
steps = max(1, min(steps, 1000)) # Clamp between 1 and 1000 steps
|
||||
|
||||
for _ in range(steps):
|
||||
engine.step()
|
||||
@@ -66,105 +84,127 @@ def step_simulation(steps: int = 10) -> None:
|
||||
history.power_dissipation.append(electrical.power_dissipation)
|
||||
|
||||
|
||||
def display_thermal_chart() -> None:
|
||||
"""Display temperature chart."""
|
||||
history: SimulationHistory = st.session_state.history
|
||||
def sync_engine_from_session_state() -> None:
|
||||
"""Sync engine parameters from session state (called by fragment)."""
|
||||
engine: PhysicsEngine = st.session_state.engine
|
||||
engine.set_chamber_setpoint(st.session_state.get("temp_setpoint", 25.0))
|
||||
engine.set_input_voltage(st.session_state.get("input_voltage", 5.0))
|
||||
engine.set_output_enabled(st.session_state.get("output_enabled", False))
|
||||
engine.set_load_current(st.session_state.get("load_current", 100.0) / 1000.0)
|
||||
|
||||
if len(history.time) < 2:
|
||||
st.info("Start the simulation to see temperature data")
|
||||
return
|
||||
|
||||
chart_data = {
|
||||
"Time (s)": list(history.time),
|
||||
"Chamber": list(history.chamber_temp),
|
||||
"Case": list(history.case_temp),
|
||||
"Junction": list(history.junction_temp),
|
||||
}
|
||||
def display_controls() -> None:
|
||||
"""Display simulation control panel in sidebar."""
|
||||
st.sidebar.header("Simulation Controls")
|
||||
|
||||
st.line_chart(
|
||||
chart_data,
|
||||
x="Time (s)",
|
||||
y=["Chamber", "Case", "Junction"],
|
||||
color=["#1f77b4", "#ff7f0e", "#d62728"],
|
||||
# Start/Stop button
|
||||
if st.session_state.running:
|
||||
if st.sidebar.button(
|
||||
"Stop Simulation", type="primary", use_container_width=True
|
||||
):
|
||||
st.session_state.running = False
|
||||
else:
|
||||
if st.sidebar.button(
|
||||
"Start Simulation", type="primary", use_container_width=True
|
||||
):
|
||||
st.session_state.running = True
|
||||
st.session_state.last_update = time.time()
|
||||
|
||||
# Reset button
|
||||
if st.sidebar.button("Reset", use_container_width=True):
|
||||
st.session_state.engine = PhysicsEngine(update_rate_hz=100.0)
|
||||
st.session_state.history = SimulationHistory()
|
||||
st.session_state.running = False
|
||||
st.session_state.last_update = time.time()
|
||||
|
||||
st.sidebar.divider()
|
||||
|
||||
# Time multiplier
|
||||
st.sidebar.subheader("Simulation Speed")
|
||||
st.sidebar.select_slider(
|
||||
"Time Multiplier",
|
||||
options=[1, 2, 5, 10, 20, 50, 100],
|
||||
value=10,
|
||||
format_func=lambda x: f"{x}x",
|
||||
key="time_multiplier",
|
||||
)
|
||||
st.sidebar.caption(
|
||||
f"1 real second = {st.session_state.get('time_multiplier', 10)} simulation seconds"
|
||||
)
|
||||
|
||||
st.sidebar.divider()
|
||||
|
||||
# Temperature setpoint
|
||||
st.sidebar.subheader("Thermal Chamber")
|
||||
st.sidebar.slider(
|
||||
"Temperature Setpoint (C)",
|
||||
min_value=-40.0,
|
||||
max_value=125.0,
|
||||
value=25.0,
|
||||
step=5.0,
|
||||
key="temp_setpoint",
|
||||
)
|
||||
|
||||
st.sidebar.divider()
|
||||
|
||||
# Power supply controls
|
||||
st.sidebar.subheader("Power Supply")
|
||||
st.sidebar.slider(
|
||||
"Input Voltage (V)",
|
||||
min_value=0.0,
|
||||
max_value=12.0,
|
||||
value=5.0,
|
||||
step=0.1,
|
||||
key="input_voltage",
|
||||
)
|
||||
|
||||
st.sidebar.toggle(
|
||||
"Output Enabled",
|
||||
value=False,
|
||||
key="output_enabled",
|
||||
)
|
||||
|
||||
st.sidebar.divider()
|
||||
|
||||
# Load controls
|
||||
st.sidebar.subheader("Electronic Load")
|
||||
st.sidebar.slider(
|
||||
"Load Current (mA)",
|
||||
min_value=0.0,
|
||||
max_value=500.0,
|
||||
value=100.0,
|
||||
step=10.0,
|
||||
key="load_current",
|
||||
)
|
||||
|
||||
|
||||
def display_self_heating_panel() -> None:
|
||||
"""Display self-heating demonstration panel."""
|
||||
@st.fragment(run_every=0.1)
|
||||
def simulation_display() -> None:
|
||||
"""Fragment that displays and updates simulation state."""
|
||||
engine: PhysicsEngine = st.session_state.engine
|
||||
history: SimulationHistory = st.session_state.history
|
||||
|
||||
# Sync engine parameters from UI controls
|
||||
sync_engine_from_session_state()
|
||||
|
||||
# Step simulation if running
|
||||
if st.session_state.running:
|
||||
step_simulation()
|
||||
|
||||
# Get current state
|
||||
thermal = engine.get_thermal_state()
|
||||
electrical = engine.get_electrical_state()
|
||||
|
||||
# Calculate temperature rises
|
||||
delta_t_jc = thermal.junction_temperature - thermal.case_temperature
|
||||
delta_t_ca = thermal.case_temperature - thermal.chamber_temperature
|
||||
|
||||
col1, col2 = st.columns(2)
|
||||
|
||||
with col1:
|
||||
st.markdown("#### Self-Heating Analysis")
|
||||
|
||||
# Display thermal resistance info
|
||||
st.markdown(
|
||||
f"""
|
||||
| Parameter | Value |
|
||||
|-----------|-------|
|
||||
| Junction-Case Rise (ΔT_jc) | **{delta_t_jc:.2f} °C** |
|
||||
| Case-Ambient Rise (ΔT_ca) | **{delta_t_ca:.2f} °C** |
|
||||
| Power Dissipation | {electrical.power_dissipation * 1000:.1f} mW |
|
||||
| θ_jc (junction-case) | 15 °C/W |
|
||||
| θ_ca (case-ambient) | 5 °C/W |
|
||||
"""
|
||||
)
|
||||
|
||||
st.markdown(
|
||||
"""
|
||||
**Thermal Coupling:** The junction temperature rises above the case
|
||||
temperature due to power dissipation. This is governed by:
|
||||
|
||||
`T_junction = T_case + P_diss × θ_jc`
|
||||
|
||||
Try increasing the load current or input voltage to see
|
||||
self-heating effects!
|
||||
"""
|
||||
)
|
||||
|
||||
with col2:
|
||||
st.markdown("#### Power Dissipation")
|
||||
|
||||
if len(history.time) < 2:
|
||||
st.info("Start the simulation to see power data")
|
||||
return
|
||||
|
||||
power_data = {
|
||||
"Time (s)": list(history.time),
|
||||
"Power (mW)": [p * 1000 for p in history.power_dissipation],
|
||||
}
|
||||
|
||||
st.line_chart(
|
||||
power_data,
|
||||
x="Time (s)",
|
||||
y="Power (mW)",
|
||||
color="#2ca02c",
|
||||
)
|
||||
|
||||
|
||||
def display_current_state() -> None:
|
||||
"""Display current simulation state metrics."""
|
||||
engine: PhysicsEngine = st.session_state.engine
|
||||
thermal = engine.get_thermal_state()
|
||||
electrical = engine.get_electrical_state()
|
||||
|
||||
# Current state metrics
|
||||
st.subheader("Current State")
|
||||
col1, col2, col3, col4 = st.columns(4)
|
||||
|
||||
with col1:
|
||||
st.metric("Chamber Temp", f"{thermal.chamber_temperature:.2f} °C")
|
||||
st.metric("Chamber Temp", f"{thermal.chamber_temperature:.2f} C")
|
||||
with col2:
|
||||
st.metric("Case Temp", f"{thermal.case_temperature:.2f} °C")
|
||||
st.metric("Case Temp", f"{thermal.case_temperature:.2f} C")
|
||||
with col3:
|
||||
st.metric("Junction Temp", f"{thermal.junction_temperature:.2f} °C")
|
||||
st.metric("Junction Temp", f"{thermal.junction_temperature:.2f} C")
|
||||
with col4:
|
||||
st.metric("Output Voltage", f"{electrical.output_voltage:.4f} V")
|
||||
|
||||
@@ -177,82 +217,79 @@ def display_current_state() -> None:
|
||||
with col7:
|
||||
st.metric("Power Diss.", f"{electrical.power_dissipation * 1000:.2f} mW")
|
||||
with col8:
|
||||
st.metric("Sim Time", f"{engine.simulation_time:.2f} s")
|
||||
status = "Running" if st.session_state.running else "Stopped"
|
||||
st.metric(
|
||||
"Sim Time",
|
||||
f"{engine.simulation_time:.1f} s",
|
||||
delta=f"{status} @ {st.session_state.get('time_multiplier', 10):.0f}x",
|
||||
)
|
||||
|
||||
|
||||
def display_controls() -> None:
|
||||
"""Display simulation control panel in sidebar."""
|
||||
engine: PhysicsEngine = st.session_state.engine
|
||||
|
||||
st.sidebar.header("Simulation Controls")
|
||||
|
||||
# Start/Stop button
|
||||
if st.session_state.running:
|
||||
if st.sidebar.button("Stop Simulation", type="primary", use_container_width=True):
|
||||
st.session_state.running = False
|
||||
st.rerun()
|
||||
# Temperature chart
|
||||
st.subheader("Temperature History")
|
||||
if len(history.time) < 2:
|
||||
st.info("Start the simulation to see temperature data")
|
||||
else:
|
||||
if st.sidebar.button(
|
||||
"Start Simulation", type="primary", use_container_width=True
|
||||
):
|
||||
st.session_state.running = True
|
||||
st.rerun()
|
||||
chart_data = {
|
||||
"Time (s)": list(history.time),
|
||||
"Chamber": list(history.chamber_temp),
|
||||
"Case": list(history.case_temp),
|
||||
"Junction": list(history.junction_temp),
|
||||
}
|
||||
st.line_chart(
|
||||
chart_data,
|
||||
x="Time (s)",
|
||||
y=["Chamber", "Case", "Junction"],
|
||||
color=["#1f77b4", "#ff7f0e", "#d62728"],
|
||||
)
|
||||
|
||||
# Reset button
|
||||
if st.sidebar.button("Reset", use_container_width=True):
|
||||
st.session_state.engine = PhysicsEngine(update_rate_hz=100.0)
|
||||
st.session_state.history = SimulationHistory()
|
||||
st.session_state.running = False
|
||||
st.rerun()
|
||||
# Self-heating demonstration
|
||||
st.subheader("Self-Heating Demonstration")
|
||||
|
||||
st.sidebar.divider()
|
||||
delta_t_jc = thermal.junction_temperature - thermal.case_temperature
|
||||
delta_t_ca = thermal.case_temperature - thermal.chamber_temperature
|
||||
|
||||
# Temperature setpoint
|
||||
st.sidebar.subheader("Thermal Chamber")
|
||||
temp_setpoint = st.sidebar.slider(
|
||||
"Temperature Setpoint (°C)",
|
||||
min_value=-40.0,
|
||||
max_value=125.0,
|
||||
value=25.0,
|
||||
step=5.0,
|
||||
key="temp_setpoint",
|
||||
)
|
||||
engine.set_chamber_setpoint(temp_setpoint)
|
||||
col1, col2 = st.columns(2)
|
||||
|
||||
st.sidebar.divider()
|
||||
with col1:
|
||||
st.markdown("#### Self-Heating Analysis")
|
||||
st.markdown(
|
||||
f"""
|
||||
| Parameter | Value |
|
||||
|-----------|-------|
|
||||
| Junction-Case Rise (dT_jc) | **{delta_t_jc:.2f} C** |
|
||||
| Case-Ambient Rise (dT_ca) | **{delta_t_ca:.2f} C** |
|
||||
| Power Dissipation | {electrical.power_dissipation * 1000:.1f} mW |
|
||||
| theta_jc (junction-case) | 15 C/W |
|
||||
| theta_ca (case-ambient) | 5 C/W |
|
||||
"""
|
||||
)
|
||||
st.markdown(
|
||||
"""
|
||||
**Thermal Coupling:** The junction temperature rises above the case
|
||||
temperature due to power dissipation. This is governed by:
|
||||
|
||||
# Power supply controls
|
||||
st.sidebar.subheader("Power Supply")
|
||||
input_voltage = st.sidebar.slider(
|
||||
"Input Voltage (V)",
|
||||
min_value=0.0,
|
||||
max_value=12.0,
|
||||
value=5.0,
|
||||
step=0.1,
|
||||
key="input_voltage",
|
||||
)
|
||||
engine.set_input_voltage(input_voltage)
|
||||
`T_junction = T_case + P_diss x theta_jc`
|
||||
|
||||
output_enabled = st.sidebar.toggle(
|
||||
"Output Enabled",
|
||||
value=engine.is_output_enabled,
|
||||
key="output_enabled",
|
||||
)
|
||||
engine.set_output_enabled(output_enabled)
|
||||
Try increasing the load current or input voltage to see
|
||||
self-heating effects!
|
||||
"""
|
||||
)
|
||||
|
||||
st.sidebar.divider()
|
||||
|
||||
# Load controls
|
||||
st.sidebar.subheader("Electronic Load")
|
||||
load_current_ma = st.sidebar.slider(
|
||||
"Load Current (mA)",
|
||||
min_value=0.0,
|
||||
max_value=500.0,
|
||||
value=100.0,
|
||||
step=10.0,
|
||||
key="load_current",
|
||||
)
|
||||
engine.set_load_current(load_current_ma / 1000.0)
|
||||
with col2:
|
||||
st.markdown("#### Power Dissipation")
|
||||
if len(history.time) < 2:
|
||||
st.info("Start the simulation to see power data")
|
||||
else:
|
||||
power_data = {
|
||||
"Time (s)": list(history.time),
|
||||
"Power (mW)": [p * 1000 for p in history.power_dissipation],
|
||||
}
|
||||
st.line_chart(
|
||||
power_data,
|
||||
x="Time (s)",
|
||||
y="Power (mW)",
|
||||
color="#2ca02c",
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
@@ -273,25 +310,11 @@ def main() -> None:
|
||||
|
||||
init_session_state()
|
||||
|
||||
# Sidebar controls
|
||||
# Sidebar controls (static - doesn't need fragment)
|
||||
display_controls()
|
||||
|
||||
# Current state display
|
||||
st.subheader("Current State")
|
||||
display_current_state()
|
||||
|
||||
# Temperature chart
|
||||
st.subheader("Temperature History")
|
||||
display_thermal_chart()
|
||||
|
||||
# Self-heating demonstration
|
||||
st.subheader("Self-Heating Demonstration")
|
||||
display_self_heating_panel()
|
||||
|
||||
# Auto-refresh when running
|
||||
if st.session_state.running:
|
||||
step_simulation(steps=10)
|
||||
st.rerun()
|
||||
# Dynamic simulation display (uses fragment for smooth updates)
|
||||
simulation_display()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
87
src/py_dvt_ate/instruments/scpi.py
Normal file
87
src/py_dvt_ate/instruments/scpi.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""SCPI command parsing.
|
||||
|
||||
This module provides SCPI (Standard Commands for Programmable Instruments)
|
||||
command parsing for instrument communication. It handles IEEE 488.2 common
|
||||
commands (*IDN?, *RST, etc.) and instrument-specific commands.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class SCPICommand:
|
||||
"""Parsed SCPI command.
|
||||
|
||||
Attributes:
|
||||
header: The command header (e.g., "TEMP:SETPOINT" or "*IDN").
|
||||
arguments: List of command arguments (e.g., ["85.0"]).
|
||||
is_query: True if the command ends with '?' (query command).
|
||||
"""
|
||||
|
||||
header: str
|
||||
arguments: list[str]
|
||||
is_query: bool
|
||||
|
||||
@property
|
||||
def keyword(self) -> str:
|
||||
"""Return the command keyword without '?'.
|
||||
|
||||
For query commands like "TEMP:SETPOINT?", returns "TEMP:SETPOINT".
|
||||
For regular commands like "VOLT", returns "VOLT".
|
||||
"""
|
||||
return self.header.rstrip("?")
|
||||
|
||||
|
||||
class SCPIParser:
|
||||
"""Parse SCPI command strings.
|
||||
|
||||
Handles both IEEE 488.2 common commands (e.g., *IDN?, *RST) and
|
||||
instrument-specific commands (e.g., VOLT 3.3, TEMP:SETPOINT?).
|
||||
|
||||
Examples:
|
||||
>>> parser = SCPIParser()
|
||||
>>> cmd = parser.parse("*IDN?")
|
||||
>>> cmd.header, cmd.is_query
|
||||
('*IDN?', True)
|
||||
>>> cmd = parser.parse("VOLT 3.3")
|
||||
>>> cmd.header, cmd.arguments
|
||||
('VOLT', ['3.3'])
|
||||
"""
|
||||
|
||||
def parse(self, command_string: str) -> SCPICommand:
|
||||
"""Parse a SCPI command string.
|
||||
|
||||
Args:
|
||||
command_string: The raw SCPI command string to parse.
|
||||
|
||||
Returns:
|
||||
SCPICommand with parsed header, arguments, and query flag.
|
||||
|
||||
Examples:
|
||||
"*IDN?" -> SCPICommand("*IDN?", [], True)
|
||||
"VOLT 3.3" -> SCPICommand("VOLT", ["3.3"], False)
|
||||
"TEMP:SETPOINT?" -> SCPICommand("TEMP:SETPOINT?", [], True)
|
||||
"CONF:VOLT:DC 10,0.001" -> SCPICommand("CONF:VOLT:DC", ["10", "0.001"], False)
|
||||
"""
|
||||
command_string = command_string.strip()
|
||||
if not command_string:
|
||||
return SCPICommand(header="", arguments=[], is_query=False)
|
||||
|
||||
# Split into header and arguments on first whitespace
|
||||
parts = command_string.split(None, 1)
|
||||
header = parts[0]
|
||||
arguments: list[str] = []
|
||||
|
||||
if len(parts) > 1:
|
||||
# Parse comma-separated arguments
|
||||
arg_string = parts[1]
|
||||
arguments = [arg.strip() for arg in arg_string.split(",")]
|
||||
|
||||
# Query is determined by whether the header ends with '?'
|
||||
is_query = header.endswith("?")
|
||||
|
||||
return SCPICommand(
|
||||
header=header,
|
||||
arguments=arguments,
|
||||
is_query=is_query,
|
||||
)
|
||||
@@ -3,3 +3,8 @@
|
||||
Provides virtual instruments backed by a coupled thermal-electrical
|
||||
physics engine. Used for development and testing without real hardware.
|
||||
"""
|
||||
|
||||
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
||||
from py_dvt_ate.simulation.tcp_server import InstrumentServer
|
||||
|
||||
__all__ = ["InstrumentServer", "ServerConfig", "SimulationServer"]
|
||||
|
||||
240
src/py_dvt_ate/simulation/server.py
Normal file
240
src/py_dvt_ate/simulation/server.py
Normal file
@@ -0,0 +1,240 @@
|
||||
"""Simulation server entry point.
|
||||
|
||||
This module provides the main entry point for running the simulation server
|
||||
with all virtual instruments wired to a shared physics engine.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import signal
|
||||
from dataclasses import dataclass
|
||||
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
from py_dvt_ate.simulation.tcp_server import InstrumentServer
|
||||
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
||||
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
|
||||
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServerConfig:
|
||||
"""Configuration for the simulation server.
|
||||
|
||||
Attributes:
|
||||
host: Host address to bind to.
|
||||
chamber_port: Port for thermal chamber instrument.
|
||||
psu_port: Port for power supply instrument.
|
||||
dmm_port: Port for multimeter instrument.
|
||||
physics_rate_hz: Physics engine update rate in Hz.
|
||||
"""
|
||||
|
||||
host: str = "127.0.0.1"
|
||||
chamber_port: int = 5000
|
||||
psu_port: int = 5001
|
||||
dmm_port: int = 5002
|
||||
physics_rate_hz: float = 100.0
|
||||
|
||||
|
||||
class SimulationServer:
|
||||
"""Complete simulation server with physics engine and instruments.
|
||||
|
||||
Creates a physics engine and wires it to all virtual instruments,
|
||||
then exposes them over TCP for client access.
|
||||
"""
|
||||
|
||||
def __init__(self, config: ServerConfig | None = None) -> None:
|
||||
"""Initialise the simulation server.
|
||||
|
||||
Args:
|
||||
config: Server configuration. Uses defaults if not provided.
|
||||
"""
|
||||
self._config = config or ServerConfig()
|
||||
self._physics_engine: PhysicsEngine | None = None
|
||||
self._instrument_server: InstrumentServer | None = None
|
||||
self._physics_task: asyncio.Task[None] | None = None
|
||||
self._running = False
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""Check if server is currently running."""
|
||||
return self._running
|
||||
|
||||
@property
|
||||
def physics_engine(self) -> PhysicsEngine | None:
|
||||
"""Get the physics engine instance."""
|
||||
return self._physics_engine
|
||||
|
||||
def _setup(self) -> None:
|
||||
"""Create and wire up all components."""
|
||||
# Create physics engine
|
||||
self._physics_engine = PhysicsEngine(
|
||||
update_rate_hz=self._config.physics_rate_hz
|
||||
)
|
||||
|
||||
# Create instruments connected to physics engine
|
||||
chamber = ThermalChamberSim(self._physics_engine)
|
||||
psu = PowerSupplySim(self._physics_engine)
|
||||
dmm = MultimeterSim(self._physics_engine)
|
||||
|
||||
# Create TCP server and register instruments
|
||||
self._instrument_server = InstrumentServer(host=self._config.host)
|
||||
self._instrument_server.register_instrument(self._config.chamber_port, chamber)
|
||||
self._instrument_server.register_instrument(self._config.psu_port, psu)
|
||||
self._instrument_server.register_instrument(self._config.dmm_port, dmm)
|
||||
|
||||
logger.info(
|
||||
"Simulation server configured: chamber=%d, psu=%d, dmm=%d",
|
||||
self._config.chamber_port,
|
||||
self._config.psu_port,
|
||||
self._config.dmm_port,
|
||||
)
|
||||
|
||||
async def _run_physics(self) -> None:
|
||||
"""Run the physics engine simulation loop."""
|
||||
if self._physics_engine is None:
|
||||
return
|
||||
|
||||
dt = self._physics_engine.dt
|
||||
|
||||
while self._running:
|
||||
self._physics_engine.step()
|
||||
# Sleep for the physics timestep
|
||||
await asyncio.sleep(dt)
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the simulation server.
|
||||
|
||||
Sets up all components and starts the TCP server and physics engine.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If server is already running.
|
||||
"""
|
||||
if self._running:
|
||||
raise RuntimeError("Server is already running")
|
||||
|
||||
self._setup()
|
||||
self._running = True
|
||||
|
||||
# Start TCP server
|
||||
if self._instrument_server is not None:
|
||||
await self._instrument_server.start()
|
||||
|
||||
# Start physics engine loop
|
||||
self._physics_task = asyncio.create_task(self._run_physics())
|
||||
|
||||
logger.info("Simulation server started")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the simulation server."""
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
self._running = False
|
||||
|
||||
# Cancel physics loop
|
||||
if self._physics_task is not None:
|
||||
self._physics_task.cancel()
|
||||
try:
|
||||
await self._physics_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._physics_task = None
|
||||
|
||||
# Stop TCP server
|
||||
if self._instrument_server is not None:
|
||||
await self._instrument_server.stop()
|
||||
self._instrument_server = None
|
||||
|
||||
self._physics_engine = None
|
||||
logger.info("Simulation server stopped")
|
||||
|
||||
async def serve_forever(self) -> None:
|
||||
"""Start the server and run until cancelled."""
|
||||
await self.start()
|
||||
try:
|
||||
# Wait for the physics task (which runs until cancelled)
|
||||
if self._physics_task is not None:
|
||||
await self._physics_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
await self.stop()
|
||||
|
||||
|
||||
async def run_server(config: ServerConfig | None = None) -> None:
|
||||
"""Run the simulation server with signal handling.
|
||||
|
||||
This is the main entry point for running the server. It sets up
|
||||
signal handlers for graceful shutdown.
|
||||
|
||||
Args:
|
||||
config: Server configuration. Uses defaults if not provided.
|
||||
"""
|
||||
server = SimulationServer(config)
|
||||
|
||||
# Set up signal handlers for graceful shutdown
|
||||
loop = asyncio.get_running_loop()
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
def signal_handler() -> None:
|
||||
logger.info("Shutdown signal received")
|
||||
stop_event.set()
|
||||
|
||||
# Register signal handlers (Unix-style, may not work on all Windows)
|
||||
try:
|
||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(sig, signal_handler)
|
||||
except NotImplementedError:
|
||||
# Windows doesn't support add_signal_handler
|
||||
pass
|
||||
|
||||
try:
|
||||
await server.start()
|
||||
logger.info("Simulation server running. Press Ctrl+C to stop.")
|
||||
|
||||
# Wait for stop signal
|
||||
await stop_event.wait()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Keyboard interrupt received")
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
|
||||
def main(
|
||||
host: str = "127.0.0.1",
|
||||
chamber_port: int = 5000,
|
||||
psu_port: int = 5001,
|
||||
dmm_port: int = 5002,
|
||||
physics_rate: float = 100.0,
|
||||
) -> None:
|
||||
"""Run the simulation server from command line.
|
||||
|
||||
Args:
|
||||
host: Host address to bind to.
|
||||
chamber_port: Port for thermal chamber.
|
||||
psu_port: Port for power supply.
|
||||
dmm_port: Port for multimeter.
|
||||
physics_rate: Physics engine update rate in Hz.
|
||||
"""
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
|
||||
config = ServerConfig(
|
||||
host=host,
|
||||
chamber_port=chamber_port,
|
||||
psu_port=psu_port,
|
||||
dmm_port=dmm_port,
|
||||
physics_rate_hz=physics_rate,
|
||||
)
|
||||
|
||||
asyncio.run(run_server(config))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
212
src/py_dvt_ate/simulation/tcp_server.py
Normal file
212
src/py_dvt_ate/simulation/tcp_server.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""Async TCP server for exposing virtual instruments over network.
|
||||
|
||||
This module provides the InstrumentServer class that hosts virtual SCPI
|
||||
instruments over TCP, allowing client applications to communicate using
|
||||
standard SCPI commands over a network connection.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from py_dvt_ate.simulation.virtual.base import BaseInstrument
|
||||
|
||||
# Re-export for type checking - actual import happens at runtime via registration
|
||||
__all__ = ["InstrumentServer"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InstrumentServer:
|
||||
"""Async TCP server hosting virtual SCPI instruments.
|
||||
|
||||
Each instrument is assigned a port. Clients connect via TCP and send
|
||||
SCPI commands as newline-terminated strings. Responses are also
|
||||
newline-terminated.
|
||||
|
||||
Attributes:
|
||||
host: Host address to bind to.
|
||||
"""
|
||||
|
||||
def __init__(self, host: str = "127.0.0.1") -> None:
|
||||
"""Initialise the instrument server.
|
||||
|
||||
Args:
|
||||
host: Host address to bind to. Defaults to localhost.
|
||||
"""
|
||||
self._host = host
|
||||
self._instruments: dict[int, BaseInstrument] = {}
|
||||
self._servers: list[asyncio.Server] = []
|
||||
self._running = False
|
||||
|
||||
@property
|
||||
def host(self) -> str:
|
||||
"""Get the host address."""
|
||||
return self._host
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""Check if server is currently running."""
|
||||
return self._running
|
||||
|
||||
def register_instrument(self, port: int, instrument: BaseInstrument) -> None:
|
||||
"""Register an instrument to be served on a specific port.
|
||||
|
||||
Args:
|
||||
port: TCP port number to serve the instrument on.
|
||||
instrument: Virtual instrument to serve.
|
||||
|
||||
Raises:
|
||||
ValueError: If port is already registered.
|
||||
RuntimeError: If server is already running.
|
||||
"""
|
||||
if self._running:
|
||||
raise RuntimeError("Cannot register instruments while server is running")
|
||||
|
||||
if port in self._instruments:
|
||||
raise ValueError(f"Port {port} is already registered")
|
||||
|
||||
self._instruments[port] = instrument
|
||||
logger.info(
|
||||
"Registered %s on port %d",
|
||||
instrument.__class__.__name__,
|
||||
port,
|
||||
)
|
||||
|
||||
def get_instrument(self, port: int) -> BaseInstrument | None:
|
||||
"""Get the instrument registered on a port.
|
||||
|
||||
Args:
|
||||
port: Port number to look up.
|
||||
|
||||
Returns:
|
||||
Registered instrument, or None if port not registered.
|
||||
"""
|
||||
return self._instruments.get(port)
|
||||
|
||||
@property
|
||||
def registered_ports(self) -> list[int]:
|
||||
"""Get list of registered port numbers."""
|
||||
return list(self._instruments.keys())
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the server and begin listening on all registered ports.
|
||||
|
||||
Creates a TCP server for each registered instrument port.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If server is already running or no instruments registered.
|
||||
"""
|
||||
if self._running:
|
||||
raise RuntimeError("Server is already running")
|
||||
|
||||
if not self._instruments:
|
||||
raise RuntimeError("No instruments registered")
|
||||
|
||||
self._running = True
|
||||
|
||||
for port, instrument in self._instruments.items():
|
||||
server = await asyncio.start_server(
|
||||
lambda r, w, inst=instrument, p=port: self._handle_client(r, w, inst, p),
|
||||
self._host,
|
||||
port,
|
||||
)
|
||||
self._servers.append(server)
|
||||
logger.info(
|
||||
"Started server for %s on %s:%d",
|
||||
instrument.__class__.__name__,
|
||||
self._host,
|
||||
port,
|
||||
)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the server and close all connections."""
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
for server in self._servers:
|
||||
server.close()
|
||||
await server.wait_closed()
|
||||
|
||||
self._servers.clear()
|
||||
self._running = False
|
||||
logger.info("Server stopped")
|
||||
|
||||
async def serve_forever(self) -> None:
|
||||
"""Start the server and run until cancelled.
|
||||
|
||||
This is a convenience method that starts the server and blocks
|
||||
until the server is stopped or cancelled.
|
||||
"""
|
||||
await self.start()
|
||||
try:
|
||||
# Keep running until cancelled
|
||||
await asyncio.gather(
|
||||
*[server.serve_forever() for server in self._servers]
|
||||
)
|
||||
finally:
|
||||
await self.stop()
|
||||
|
||||
async def _handle_client(
|
||||
self,
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
instrument: BaseInstrument,
|
||||
port: int,
|
||||
) -> None:
|
||||
"""Handle a client connection.
|
||||
|
||||
Reads SCPI commands (newline-terminated), processes them through
|
||||
the instrument, and sends back responses (newline-terminated).
|
||||
|
||||
Args:
|
||||
reader: Stream reader for incoming data.
|
||||
writer: Stream writer for outgoing data.
|
||||
instrument: The instrument to process commands.
|
||||
port: Port number for logging.
|
||||
"""
|
||||
addr = writer.get_extra_info("peername")
|
||||
logger.info("Client connected to port %d from %s", port, addr)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Read until newline (SCPI line terminator)
|
||||
data = await reader.readline()
|
||||
|
||||
if not data:
|
||||
# Client disconnected
|
||||
break
|
||||
|
||||
# Decode and strip whitespace
|
||||
command = data.decode("utf-8").strip()
|
||||
|
||||
if not command:
|
||||
continue
|
||||
|
||||
logger.debug("Port %d received: %s", port, command)
|
||||
|
||||
# Process command through instrument
|
||||
response = instrument.process(command)
|
||||
|
||||
# Send response with newline terminator
|
||||
if response:
|
||||
writer.write(f"{response}\n".encode("utf-8"))
|
||||
await writer.drain()
|
||||
logger.debug("Port %d sent: %s", port, response)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Client handler cancelled for port %d", port)
|
||||
except ConnectionResetError:
|
||||
logger.debug("Client connection reset on port %d", port)
|
||||
except Exception as e:
|
||||
logger.error("Error handling client on port %d: %s", port, e)
|
||||
finally:
|
||||
writer.close()
|
||||
try:
|
||||
await writer.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info("Client disconnected from port %d", port)
|
||||
156
src/py_dvt_ate/simulation/virtual/base.py
Normal file
156
src/py_dvt_ate/simulation/virtual/base.py
Normal file
@@ -0,0 +1,156 @@
|
||||
"""Base class for virtual instrument simulators.
|
||||
|
||||
This module provides the foundation for implementing SCPI-based virtual
|
||||
instruments that can be exposed over TCP for hardware abstraction testing.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Callable
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from py_dvt_ate.instruments.scpi import SCPICommand, SCPIParser
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
|
||||
|
||||
# Type alias for command handlers
|
||||
CommandHandler = Callable[[SCPICommand], str]
|
||||
|
||||
|
||||
class BaseInstrument(ABC):
|
||||
"""Abstract base class for virtual SCPI instruments.
|
||||
|
||||
Provides common functionality for SCPI command parsing and dispatch.
|
||||
Subclasses should register command handlers using the register_command
|
||||
method or by overriding _setup_commands.
|
||||
|
||||
Attributes:
|
||||
manufacturer: Instrument manufacturer name for *IDN? response.
|
||||
model: Instrument model name for *IDN? response.
|
||||
serial_number: Instrument serial number for *IDN? response.
|
||||
firmware_version: Firmware version for *IDN? response.
|
||||
"""
|
||||
|
||||
manufacturer: str = "PyDVTATE"
|
||||
model: str = "Virtual Instrument"
|
||||
serial_number: str = "SIM001"
|
||||
firmware_version: str = "1.0.0"
|
||||
|
||||
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
|
||||
"""Initialise the base instrument.
|
||||
|
||||
Args:
|
||||
physics_engine: Reference to physics engine for simulation state.
|
||||
May be None for standalone operation.
|
||||
"""
|
||||
self._physics_engine = physics_engine
|
||||
self._parser = SCPIParser()
|
||||
self._handlers: dict[str, CommandHandler] = {}
|
||||
self._setup_common_commands()
|
||||
self._setup_commands()
|
||||
|
||||
def _setup_common_commands(self) -> None:
|
||||
"""Register IEEE 488.2 common commands."""
|
||||
self.register_command("*IDN", self._handle_idn)
|
||||
self.register_command("*RST", self._handle_rst)
|
||||
self.register_command("*CLS", self._handle_cls)
|
||||
self.register_command("*OPC", self._handle_opc)
|
||||
|
||||
@abstractmethod
|
||||
def _setup_commands(self) -> None:
|
||||
"""Register instrument-specific command handlers.
|
||||
|
||||
Subclasses must implement this method to register their
|
||||
SCPI command handlers using register_command().
|
||||
"""
|
||||
|
||||
def register_command(self, keyword: str, handler: CommandHandler) -> None:
|
||||
"""Register a handler for a SCPI command keyword.
|
||||
|
||||
Args:
|
||||
keyword: The command keyword (e.g., "TEMP:SETPOINT").
|
||||
For commands that support both set and query forms,
|
||||
register the base keyword without '?'.
|
||||
handler: Callable that takes SCPICommand and returns response string.
|
||||
"""
|
||||
self._handlers[keyword.upper()] = handler
|
||||
|
||||
def process(self, command_string: str) -> str:
|
||||
"""Process a SCPI command string and return the response.
|
||||
|
||||
Args:
|
||||
command_string: Raw SCPI command string to process.
|
||||
|
||||
Returns:
|
||||
Response string. Empty string for commands with no response.
|
||||
Error string starting with "ERROR:" for invalid commands.
|
||||
"""
|
||||
command = self._parser.parse(command_string)
|
||||
|
||||
if not command.header:
|
||||
return ""
|
||||
|
||||
# Look up handler by keyword (without '?' suffix)
|
||||
keyword = command.keyword.upper()
|
||||
handler = self._handlers.get(keyword)
|
||||
|
||||
if handler is None:
|
||||
return f"ERROR: Unknown command '{keyword}'"
|
||||
|
||||
try:
|
||||
return handler(command)
|
||||
except ValueError as e:
|
||||
return f"ERROR: {e}"
|
||||
except Exception as e:
|
||||
return f"ERROR: Internal error - {e}"
|
||||
|
||||
def _handle_idn(self, command: SCPICommand) -> str:
|
||||
"""Handle *IDN? identification query.
|
||||
|
||||
Returns:
|
||||
Comma-separated identification string.
|
||||
"""
|
||||
if not command.is_query:
|
||||
return "ERROR: *IDN is query only"
|
||||
return f"{self.manufacturer},{self.model},{self.serial_number},{self.firmware_version}"
|
||||
|
||||
def _handle_rst(self, command: SCPICommand) -> str:
|
||||
"""Handle *RST reset command.
|
||||
|
||||
Returns:
|
||||
Empty string (no response for reset).
|
||||
"""
|
||||
if command.is_query:
|
||||
return "ERROR: *RST is command only"
|
||||
self.reset()
|
||||
return ""
|
||||
|
||||
def _handle_cls(self, command: SCPICommand) -> str:
|
||||
"""Handle *CLS clear status command.
|
||||
|
||||
Returns:
|
||||
Empty string (no response for clear).
|
||||
"""
|
||||
if command.is_query:
|
||||
return "ERROR: *CLS is command only"
|
||||
return ""
|
||||
|
||||
def _handle_opc(self, command: SCPICommand) -> str:
|
||||
"""Handle *OPC operation complete command/query.
|
||||
|
||||
Returns:
|
||||
"1" for query, empty string for command.
|
||||
"""
|
||||
if command.is_query:
|
||||
return "1"
|
||||
return ""
|
||||
|
||||
@abstractmethod
|
||||
def reset(self) -> None:
|
||||
"""Reset instrument to default state.
|
||||
|
||||
Subclasses must implement this to define reset behaviour.
|
||||
"""
|
||||
143
src/py_dvt_ate/simulation/virtual/chamber.py
Normal file
143
src/py_dvt_ate/simulation/virtual/chamber.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Virtual thermal chamber simulator.
|
||||
|
||||
This module implements a SCPI-based virtual thermal chamber that interfaces
|
||||
with the physics engine to provide realistic temperature control simulation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from py_dvt_ate.instruments.scpi import SCPICommand
|
||||
from py_dvt_ate.simulation.virtual.base import BaseInstrument
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
|
||||
|
||||
class ThermalChamberSim(BaseInstrument):
|
||||
"""Virtual thermal chamber simulator.
|
||||
|
||||
Simulates a thermal chamber with SCPI control interface. The chamber
|
||||
temperature behaviour is driven by the physics engine.
|
||||
|
||||
SCPI Commands:
|
||||
TEMP:SETPOINT <value> - Set target temperature in degrees C
|
||||
TEMP:SETPOINT? - Query current setpoint
|
||||
TEMP:ACTUAL? - Query actual chamber temperature
|
||||
TEMP:STAB? - Query temperature stability (1=stable, 0=settling)
|
||||
|
||||
Attributes:
|
||||
manufacturer: "PyDVTATE"
|
||||
model: "TC-SIM-001"
|
||||
"""
|
||||
|
||||
manufacturer = "PyDVTATE"
|
||||
model = "TC-SIM-001"
|
||||
serial_number = "TCSIM001"
|
||||
firmware_version = "1.0.0"
|
||||
|
||||
# Stability threshold in degrees C
|
||||
STABILITY_THRESHOLD = 0.5
|
||||
|
||||
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
|
||||
"""Initialise the thermal chamber simulator.
|
||||
|
||||
Args:
|
||||
physics_engine: Reference to physics engine for temperature state.
|
||||
"""
|
||||
self._setpoint = 25.0 # Default setpoint
|
||||
super().__init__(physics_engine)
|
||||
|
||||
def _setup_commands(self) -> None:
|
||||
"""Register thermal chamber SCPI commands."""
|
||||
self.register_command("TEMP:SETPOINT", self._handle_temp_setpoint)
|
||||
self.register_command("TEMP:ACTUAL", self._handle_temp_actual)
|
||||
self.register_command("TEMP:STAB", self._handle_temp_stab)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset chamber to default state."""
|
||||
self._setpoint = 25.0
|
||||
if self._physics_engine is not None:
|
||||
self._physics_engine.set_chamber_setpoint(self._setpoint)
|
||||
|
||||
def _handle_temp_setpoint(self, command: SCPICommand) -> str:
|
||||
"""Handle TEMP:SETPOINT command/query.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Setpoint value for query, empty string for set command.
|
||||
|
||||
Raises:
|
||||
ValueError: If setpoint argument is invalid.
|
||||
"""
|
||||
if command.is_query:
|
||||
return f"{self._setpoint:.2f}"
|
||||
|
||||
# Set command requires one argument
|
||||
if not command.arguments:
|
||||
raise ValueError("TEMP:SETPOINT requires a value")
|
||||
|
||||
try:
|
||||
setpoint = float(command.arguments[0])
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid temperature value: {command.arguments[0]}")
|
||||
|
||||
self._setpoint = setpoint
|
||||
if self._physics_engine is not None:
|
||||
self._physics_engine.set_chamber_setpoint(setpoint)
|
||||
|
||||
return ""
|
||||
|
||||
def _handle_temp_actual(self, command: SCPICommand) -> str:
|
||||
"""Handle TEMP:ACTUAL? query.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Actual chamber temperature.
|
||||
|
||||
Raises:
|
||||
ValueError: If used as command (not query).
|
||||
"""
|
||||
if not command.is_query:
|
||||
raise ValueError("TEMP:ACTUAL is query only")
|
||||
|
||||
if self._physics_engine is None:
|
||||
# Return setpoint if no physics engine connected
|
||||
return f"{self._setpoint:.2f}"
|
||||
|
||||
thermal_state = self._physics_engine.get_thermal_state()
|
||||
return f"{thermal_state.chamber_temperature:.2f}"
|
||||
|
||||
def _handle_temp_stab(self, command: SCPICommand) -> str:
|
||||
"""Handle TEMP:STAB? stability query.
|
||||
|
||||
Temperature is considered stable when the actual chamber temperature
|
||||
is within STABILITY_THRESHOLD of the setpoint.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
"1" if stable, "0" if settling.
|
||||
|
||||
Raises:
|
||||
ValueError: If used as command (not query).
|
||||
"""
|
||||
if not command.is_query:
|
||||
raise ValueError("TEMP:STAB is query only")
|
||||
|
||||
if self._physics_engine is None:
|
||||
# Assume stable if no physics engine connected
|
||||
return "1"
|
||||
|
||||
thermal_state = self._physics_engine.get_thermal_state()
|
||||
error = abs(thermal_state.chamber_temperature - self._setpoint)
|
||||
|
||||
if error <= self.STABILITY_THRESHOLD:
|
||||
return "1"
|
||||
return "0"
|
||||
213
src/py_dvt_ate/simulation/virtual/multimeter.py
Normal file
213
src/py_dvt_ate/simulation/virtual/multimeter.py
Normal file
@@ -0,0 +1,213 @@
|
||||
"""Virtual digital multimeter (DMM) simulator.
|
||||
|
||||
This module implements a SCPI-based virtual multimeter that interfaces
|
||||
with the physics engine to measure DUT electrical parameters.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from py_dvt_ate.instruments.scpi import SCPICommand
|
||||
from py_dvt_ate.simulation.virtual.base import BaseInstrument
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
|
||||
|
||||
class MeasurementFunction(Enum):
|
||||
"""Available measurement functions."""
|
||||
|
||||
VOLTAGE_DC = "VOLT:DC"
|
||||
CURRENT_DC = "CURR:DC"
|
||||
|
||||
|
||||
class MultimeterSim(BaseInstrument):
|
||||
"""Virtual digital multimeter simulator.
|
||||
|
||||
Simulates a digital multimeter with SCPI control interface. The DMM
|
||||
measures DUT output voltage and load current via the physics engine.
|
||||
|
||||
SCPI Commands:
|
||||
MEAS:VOLT:DC? - Measure DC voltage (shortcut)
|
||||
MEAS:CURR:DC? - Measure DC current (shortcut)
|
||||
CONF:VOLT:DC - Configure for DC voltage measurement
|
||||
CONF:CURR:DC - Configure for DC current measurement
|
||||
CONF? - Query current configuration
|
||||
READ? - Take measurement with current configuration
|
||||
|
||||
Attributes:
|
||||
manufacturer: "PyDVTATE"
|
||||
model: "DMM-SIM-001"
|
||||
"""
|
||||
|
||||
manufacturer = "PyDVTATE"
|
||||
model = "DMM-SIM-001"
|
||||
serial_number = "DMMSIM001"
|
||||
firmware_version = "1.0.0"
|
||||
|
||||
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
|
||||
"""Initialise the multimeter simulator.
|
||||
|
||||
Args:
|
||||
physics_engine: Reference to physics engine for measurement values.
|
||||
"""
|
||||
self._function = MeasurementFunction.VOLTAGE_DC
|
||||
super().__init__(physics_engine)
|
||||
|
||||
def _setup_commands(self) -> None:
|
||||
"""Register multimeter SCPI commands."""
|
||||
self.register_command("MEAS:VOLT:DC", self._handle_meas_volt_dc)
|
||||
self.register_command("MEAS:CURR:DC", self._handle_meas_curr_dc)
|
||||
self.register_command("CONF:VOLT:DC", self._handle_conf_volt_dc)
|
||||
self.register_command("CONF:CURR:DC", self._handle_conf_curr_dc)
|
||||
self.register_command("CONF", self._handle_conf)
|
||||
self.register_command("READ", self._handle_read)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset multimeter to default state."""
|
||||
self._function = MeasurementFunction.VOLTAGE_DC
|
||||
|
||||
def _handle_meas_volt_dc(self, command: SCPICommand) -> str:
|
||||
"""Handle MEAS:VOLT:DC? query.
|
||||
|
||||
Configures for DC voltage and takes measurement in one command.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Measured DC voltage.
|
||||
|
||||
Raises:
|
||||
ValueError: If used as command (not query).
|
||||
"""
|
||||
if not command.is_query:
|
||||
raise ValueError("MEAS:VOLT:DC is query only")
|
||||
|
||||
self._function = MeasurementFunction.VOLTAGE_DC
|
||||
return self._measure_voltage_dc()
|
||||
|
||||
def _handle_meas_curr_dc(self, command: SCPICommand) -> str:
|
||||
"""Handle MEAS:CURR:DC? query.
|
||||
|
||||
Configures for DC current and takes measurement in one command.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Measured DC current.
|
||||
|
||||
Raises:
|
||||
ValueError: If used as command (not query).
|
||||
"""
|
||||
if not command.is_query:
|
||||
raise ValueError("MEAS:CURR:DC is query only")
|
||||
|
||||
self._function = MeasurementFunction.CURRENT_DC
|
||||
return self._measure_current_dc()
|
||||
|
||||
def _handle_conf_volt_dc(self, command: SCPICommand) -> str:
|
||||
"""Handle CONF:VOLT:DC command.
|
||||
|
||||
Configures multimeter for DC voltage measurement.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Empty string (no response for configuration).
|
||||
|
||||
Raises:
|
||||
ValueError: If used as query.
|
||||
"""
|
||||
if command.is_query:
|
||||
raise ValueError("CONF:VOLT:DC is command only")
|
||||
|
||||
self._function = MeasurementFunction.VOLTAGE_DC
|
||||
return ""
|
||||
|
||||
def _handle_conf_curr_dc(self, command: SCPICommand) -> str:
|
||||
"""Handle CONF:CURR:DC command.
|
||||
|
||||
Configures multimeter for DC current measurement.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Empty string (no response for configuration).
|
||||
|
||||
Raises:
|
||||
ValueError: If used as query.
|
||||
"""
|
||||
if command.is_query:
|
||||
raise ValueError("CONF:CURR:DC is command only")
|
||||
|
||||
self._function = MeasurementFunction.CURRENT_DC
|
||||
return ""
|
||||
|
||||
def _handle_conf(self, command: SCPICommand) -> str:
|
||||
"""Handle CONF? query.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Current measurement configuration.
|
||||
|
||||
Raises:
|
||||
ValueError: If used as command without subcommand.
|
||||
"""
|
||||
if not command.is_query:
|
||||
raise ValueError("CONF requires a function (e.g., CONF:VOLT:DC)")
|
||||
|
||||
return f'"{self._function.value}"'
|
||||
|
||||
def _handle_read(self, command: SCPICommand) -> str:
|
||||
"""Handle READ? query.
|
||||
|
||||
Takes measurement using current configuration.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Measured value.
|
||||
|
||||
Raises:
|
||||
ValueError: If used as command (not query).
|
||||
"""
|
||||
if not command.is_query:
|
||||
raise ValueError("READ is query only")
|
||||
|
||||
if self._function == MeasurementFunction.VOLTAGE_DC:
|
||||
return self._measure_voltage_dc()
|
||||
else:
|
||||
return self._measure_current_dc()
|
||||
|
||||
def _measure_voltage_dc(self) -> str:
|
||||
"""Measure DC voltage from physics engine.
|
||||
|
||||
Returns:
|
||||
Formatted voltage reading.
|
||||
"""
|
||||
if self._physics_engine is None:
|
||||
return "0.000000"
|
||||
|
||||
electrical_state = self._physics_engine.get_electrical_state()
|
||||
return f"{electrical_state.output_voltage:.6f}"
|
||||
|
||||
def _measure_current_dc(self) -> str:
|
||||
"""Measure DC current from physics engine.
|
||||
|
||||
Returns:
|
||||
Formatted current reading.
|
||||
"""
|
||||
if self._physics_engine is None:
|
||||
return "0.000000"
|
||||
|
||||
electrical_state = self._physics_engine.get_electrical_state()
|
||||
return f"{electrical_state.load_current:.6f}"
|
||||
222
src/py_dvt_ate/simulation/virtual/power_supply.py
Normal file
222
src/py_dvt_ate/simulation/virtual/power_supply.py
Normal file
@@ -0,0 +1,222 @@
|
||||
"""Virtual power supply simulator.
|
||||
|
||||
This module implements a SCPI-based virtual power supply that interfaces
|
||||
with the physics engine to provide realistic power supply simulation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from py_dvt_ate.instruments.scpi import SCPICommand
|
||||
from py_dvt_ate.simulation.virtual.base import BaseInstrument
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
|
||||
|
||||
class PowerSupplySim(BaseInstrument):
|
||||
"""Virtual power supply simulator.
|
||||
|
||||
Simulates a programmable DC power supply with SCPI control interface.
|
||||
The power supply provides input voltage to the DUT via the physics engine.
|
||||
|
||||
SCPI Commands:
|
||||
VOLT <value> - Set output voltage in volts
|
||||
VOLT? - Query voltage setpoint
|
||||
CURR <value> - Set current limit in amps
|
||||
CURR? - Query current limit
|
||||
OUTP <ON|OFF|1|0> - Enable/disable output
|
||||
OUTP? - Query output state (1=on, 0=off)
|
||||
MEAS:VOLT? - Measure actual output voltage
|
||||
MEAS:CURR? - Measure actual output current
|
||||
|
||||
Attributes:
|
||||
manufacturer: "PyDVTATE"
|
||||
model: "PS-SIM-001"
|
||||
"""
|
||||
|
||||
manufacturer = "PyDVTATE"
|
||||
model = "PS-SIM-001"
|
||||
serial_number = "PSSIM001"
|
||||
firmware_version = "1.0.0"
|
||||
|
||||
# Default values
|
||||
DEFAULT_VOLTAGE = 0.0
|
||||
DEFAULT_CURRENT_LIMIT = 1.0
|
||||
|
||||
def __init__(self, physics_engine: PhysicsEngine | None = None) -> None:
|
||||
"""Initialise the power supply simulator.
|
||||
|
||||
Args:
|
||||
physics_engine: Reference to physics engine for electrical state.
|
||||
"""
|
||||
self._voltage_setpoint = self.DEFAULT_VOLTAGE
|
||||
self._current_limit = self.DEFAULT_CURRENT_LIMIT
|
||||
self._output_enabled = False
|
||||
super().__init__(physics_engine)
|
||||
|
||||
def _setup_commands(self) -> None:
|
||||
"""Register power supply SCPI commands."""
|
||||
self.register_command("VOLT", self._handle_volt)
|
||||
self.register_command("CURR", self._handle_curr)
|
||||
self.register_command("OUTP", self._handle_outp)
|
||||
self.register_command("MEAS:VOLT", self._handle_meas_volt)
|
||||
self.register_command("MEAS:CURR", self._handle_meas_curr)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset power supply to default state."""
|
||||
self._voltage_setpoint = self.DEFAULT_VOLTAGE
|
||||
self._current_limit = self.DEFAULT_CURRENT_LIMIT
|
||||
self._output_enabled = False
|
||||
|
||||
if self._physics_engine is not None:
|
||||
self._physics_engine.set_input_voltage(0.0)
|
||||
self._physics_engine.set_output_enabled(False)
|
||||
|
||||
def _handle_volt(self, command: SCPICommand) -> str:
|
||||
"""Handle VOLT command/query.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Voltage setpoint for query, empty string for set command.
|
||||
|
||||
Raises:
|
||||
ValueError: If voltage argument is invalid.
|
||||
"""
|
||||
if command.is_query:
|
||||
return f"{self._voltage_setpoint:.3f}"
|
||||
|
||||
if not command.arguments:
|
||||
raise ValueError("VOLT requires a value")
|
||||
|
||||
try:
|
||||
voltage = float(command.arguments[0])
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid voltage value: {command.arguments[0]}")
|
||||
|
||||
if voltage < 0:
|
||||
raise ValueError("Voltage cannot be negative")
|
||||
|
||||
self._voltage_setpoint = voltage
|
||||
|
||||
if self._physics_engine is not None and self._output_enabled:
|
||||
self._physics_engine.set_input_voltage(voltage)
|
||||
|
||||
return ""
|
||||
|
||||
def _handle_curr(self, command: SCPICommand) -> str:
|
||||
"""Handle CURR command/query.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Current limit for query, empty string for set command.
|
||||
|
||||
Raises:
|
||||
ValueError: If current argument is invalid.
|
||||
"""
|
||||
if command.is_query:
|
||||
return f"{self._current_limit:.3f}"
|
||||
|
||||
if not command.arguments:
|
||||
raise ValueError("CURR requires a value")
|
||||
|
||||
try:
|
||||
current = float(command.arguments[0])
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid current value: {command.arguments[0]}")
|
||||
|
||||
if current < 0:
|
||||
raise ValueError("Current limit cannot be negative")
|
||||
|
||||
self._current_limit = current
|
||||
return ""
|
||||
|
||||
def _handle_outp(self, command: SCPICommand) -> str:
|
||||
"""Handle OUTP command/query.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
"1" or "0" for query, empty string for set command.
|
||||
|
||||
Raises:
|
||||
ValueError: If output argument is invalid.
|
||||
"""
|
||||
if command.is_query:
|
||||
return "1" if self._output_enabled else "0"
|
||||
|
||||
if not command.arguments:
|
||||
raise ValueError("OUTP requires a value (ON, OFF, 1, or 0)")
|
||||
|
||||
arg = command.arguments[0].upper()
|
||||
if arg in ("ON", "1"):
|
||||
self._output_enabled = True
|
||||
elif arg in ("OFF", "0"):
|
||||
self._output_enabled = False
|
||||
else:
|
||||
raise ValueError(f"Invalid output state: {command.arguments[0]}")
|
||||
|
||||
if self._physics_engine is not None:
|
||||
self._physics_engine.set_output_enabled(self._output_enabled)
|
||||
if self._output_enabled:
|
||||
self._physics_engine.set_input_voltage(self._voltage_setpoint)
|
||||
else:
|
||||
self._physics_engine.set_input_voltage(0.0)
|
||||
|
||||
return ""
|
||||
|
||||
def _handle_meas_volt(self, command: SCPICommand) -> str:
|
||||
"""Handle MEAS:VOLT? query.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Measured output voltage.
|
||||
|
||||
Raises:
|
||||
ValueError: If used as command (not query).
|
||||
"""
|
||||
if not command.is_query:
|
||||
raise ValueError("MEAS:VOLT is query only")
|
||||
|
||||
if not self._output_enabled:
|
||||
return "0.000"
|
||||
|
||||
if self._physics_engine is None:
|
||||
return f"{self._voltage_setpoint:.3f}"
|
||||
|
||||
electrical_state = self._physics_engine.get_electrical_state()
|
||||
return f"{electrical_state.input_voltage:.3f}"
|
||||
|
||||
def _handle_meas_curr(self, command: SCPICommand) -> str:
|
||||
"""Handle MEAS:CURR? query.
|
||||
|
||||
Args:
|
||||
command: Parsed SCPI command.
|
||||
|
||||
Returns:
|
||||
Measured output current.
|
||||
|
||||
Raises:
|
||||
ValueError: If used as command (not query).
|
||||
"""
|
||||
if not command.is_query:
|
||||
raise ValueError("MEAS:CURR is query only")
|
||||
|
||||
if not self._output_enabled:
|
||||
return "0.000"
|
||||
|
||||
if self._physics_engine is None:
|
||||
return "0.000"
|
||||
|
||||
electrical_state = self._physics_engine.get_electrical_state()
|
||||
# Total current is load current + quiescent current
|
||||
total_current = electrical_state.load_current + electrical_state.quiescent_current
|
||||
return f"{total_current:.3f}"
|
||||
@@ -1 +1,8 @@
|
||||
"""pytest fixtures for py_dvt_ate tests."""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def pytest_configure(config: pytest.Config) -> None:
|
||||
"""Configure pytest markers."""
|
||||
config.addinivalue_line("markers", "asyncio: mark test as async")
|
||||
|
||||
1
tests/integration/__init__.py
Normal file
1
tests/integration/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Integration tests for py_dvt_ate."""
|
||||
1
tests/integration/conftest.py
Normal file
1
tests/integration/conftest.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Configuration for integration tests."""
|
||||
274
tests/integration/test_tcp_server.py
Normal file
274
tests/integration/test_tcp_server.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""Integration tests for TCP server.
|
||||
|
||||
Tests the InstrumentServer and SimulationServer with actual TCP connections.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
||||
from py_dvt_ate.simulation.tcp_server import InstrumentServer
|
||||
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
||||
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
|
||||
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="function")
|
||||
class TestInstrumentServer:
|
||||
"""Tests for InstrumentServer TCP functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
def physics_engine(self) -> PhysicsEngine:
|
||||
"""Create a physics engine for testing."""
|
||||
return PhysicsEngine(update_rate_hz=100.0)
|
||||
|
||||
@pytest.fixture
|
||||
def server(self, physics_engine: PhysicsEngine) -> InstrumentServer:
|
||||
"""Create an instrument server with a thermal chamber."""
|
||||
server = InstrumentServer(host="127.0.0.1")
|
||||
chamber = ThermalChamberSim(physics_engine)
|
||||
server.register_instrument(15000, chamber)
|
||||
return server
|
||||
|
||||
async def test_server_start_stop(self, server: InstrumentServer) -> None:
|
||||
"""Test server can start and stop."""
|
||||
assert not server.is_running
|
||||
|
||||
await server.start()
|
||||
assert server.is_running
|
||||
|
||||
await server.stop()
|
||||
assert not server.is_running
|
||||
|
||||
async def test_client_connection(self, server: InstrumentServer) -> None:
|
||||
"""Test client can connect and send command."""
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
|
||||
|
||||
# Send *IDN? query
|
||||
writer.write(b"*IDN?\n")
|
||||
await writer.drain()
|
||||
|
||||
# Read response
|
||||
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
||||
assert b"PyDVTATE" in response
|
||||
assert b"TC-SIM-001" in response
|
||||
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
async def test_multiple_commands(self, server: InstrumentServer) -> None:
|
||||
"""Test sending multiple commands in sequence."""
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
|
||||
|
||||
# Set temperature setpoint
|
||||
writer.write(b"TEMP:SETPOINT 85.0\n")
|
||||
await writer.drain()
|
||||
|
||||
# Query setpoint
|
||||
writer.write(b"TEMP:SETPOINT?\n")
|
||||
await writer.drain()
|
||||
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
||||
assert b"85.00" in response
|
||||
|
||||
# Query actual temperature
|
||||
writer.write(b"TEMP:ACTUAL?\n")
|
||||
await writer.drain()
|
||||
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
||||
# Should return a valid float
|
||||
temp = float(response.decode().strip())
|
||||
assert -50 <= temp <= 200
|
||||
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
async def test_concurrent_connections(
|
||||
self, physics_engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test multiple concurrent client connections."""
|
||||
server = InstrumentServer(host="127.0.0.1")
|
||||
chamber = ThermalChamberSim(physics_engine)
|
||||
server.register_instrument(15001, chamber)
|
||||
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Connect two clients simultaneously
|
||||
reader1, writer1 = await asyncio.open_connection("127.0.0.1", 15001)
|
||||
reader2, writer2 = await asyncio.open_connection("127.0.0.1", 15001)
|
||||
|
||||
# Send command from client 1
|
||||
writer1.write(b"*IDN?\n")
|
||||
await writer1.drain()
|
||||
response1 = await asyncio.wait_for(reader1.readline(), timeout=2.0)
|
||||
|
||||
# Send command from client 2
|
||||
writer2.write(b"*IDN?\n")
|
||||
await writer2.drain()
|
||||
response2 = await asyncio.wait_for(reader2.readline(), timeout=2.0)
|
||||
|
||||
# Both should get valid responses
|
||||
assert b"TC-SIM-001" in response1
|
||||
assert b"TC-SIM-001" in response2
|
||||
|
||||
writer1.close()
|
||||
writer2.close()
|
||||
await writer1.wait_closed()
|
||||
await writer2.wait_closed()
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="function")
|
||||
class TestSimulationServer:
|
||||
"""Tests for complete SimulationServer."""
|
||||
|
||||
async def test_simulation_server_start_stop(self) -> None:
|
||||
"""Test simulation server lifecycle."""
|
||||
config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=16000,
|
||||
psu_port=16001,
|
||||
dmm_port=16002,
|
||||
physics_rate_hz=100.0,
|
||||
)
|
||||
server = SimulationServer(config)
|
||||
|
||||
assert not server.is_running
|
||||
|
||||
await server.start()
|
||||
assert server.is_running
|
||||
assert server.physics_engine is not None
|
||||
|
||||
await server.stop()
|
||||
assert not server.is_running
|
||||
|
||||
async def test_all_instruments_accessible(self) -> None:
|
||||
"""Test all three instruments are accessible over TCP."""
|
||||
config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=16100,
|
||||
psu_port=16101,
|
||||
dmm_port=16102,
|
||||
)
|
||||
server = SimulationServer(config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Test thermal chamber
|
||||
r, w = await asyncio.open_connection("127.0.0.1", 16100)
|
||||
w.write(b"*IDN?\n")
|
||||
await w.drain()
|
||||
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
||||
assert b"TC-SIM-001" in resp
|
||||
w.close()
|
||||
await w.wait_closed()
|
||||
|
||||
# Test power supply
|
||||
r, w = await asyncio.open_connection("127.0.0.1", 16101)
|
||||
w.write(b"*IDN?\n")
|
||||
await w.drain()
|
||||
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
||||
assert b"PS-SIM-001" in resp
|
||||
w.close()
|
||||
await w.wait_closed()
|
||||
|
||||
# Test multimeter
|
||||
r, w = await asyncio.open_connection("127.0.0.1", 16102)
|
||||
w.write(b"*IDN?\n")
|
||||
await w.drain()
|
||||
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
||||
assert b"DMM-SIM-001" in resp
|
||||
w.close()
|
||||
await w.wait_closed()
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
async def test_physics_engine_integration(self) -> None:
|
||||
"""Test instruments share physics engine state."""
|
||||
config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=16200,
|
||||
psu_port=16201,
|
||||
dmm_port=16202,
|
||||
)
|
||||
server = SimulationServer(config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Connect to power supply and enable output
|
||||
psu_r, psu_w = await asyncio.open_connection("127.0.0.1", 16201)
|
||||
psu_w.write(b"VOLT 5.0\n")
|
||||
await psu_w.drain()
|
||||
psu_w.write(b"OUTP ON\n")
|
||||
await psu_w.drain()
|
||||
|
||||
# Run a few physics steps
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Query voltage from power supply
|
||||
psu_w.write(b"MEAS:VOLT?\n")
|
||||
await psu_w.drain()
|
||||
psu_resp = await asyncio.wait_for(psu_r.readline(), timeout=2.0)
|
||||
psu_voltage = float(psu_resp.decode().strip())
|
||||
|
||||
# Connect to DMM and measure DUT output
|
||||
dmm_r, dmm_w = await asyncio.open_connection("127.0.0.1", 16202)
|
||||
dmm_w.write(b"MEAS:VOLT:DC?\n")
|
||||
await dmm_w.drain()
|
||||
dmm_resp = await asyncio.wait_for(dmm_r.readline(), timeout=2.0)
|
||||
dmm_voltage = float(dmm_resp.decode().strip())
|
||||
|
||||
# PSU should show input voltage (5V)
|
||||
assert 4.9 <= psu_voltage <= 5.1
|
||||
|
||||
# DMM should show DUT output voltage (LDO regulated ~3.3V)
|
||||
assert 3.0 <= dmm_voltage <= 3.5
|
||||
|
||||
psu_w.close()
|
||||
dmm_w.close()
|
||||
await psu_w.wait_closed()
|
||||
await dmm_w.wait_closed()
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
async def test_error_handling(self) -> None:
|
||||
"""Test invalid commands return errors."""
|
||||
config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=16300,
|
||||
psu_port=16301,
|
||||
dmm_port=16302,
|
||||
)
|
||||
server = SimulationServer(config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
r, w = await asyncio.open_connection("127.0.0.1", 16300)
|
||||
|
||||
# Send invalid command
|
||||
w.write(b"INVALID:COMMAND\n")
|
||||
await w.drain()
|
||||
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
||||
assert b"ERROR" in resp
|
||||
|
||||
w.close()
|
||||
await w.wait_closed()
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
306
tests/unit/test_multimeter.py
Normal file
306
tests/unit/test_multimeter.py
Normal file
@@ -0,0 +1,306 @@
|
||||
"""Unit tests for multimeter simulator."""
|
||||
|
||||
import pytest
|
||||
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
|
||||
|
||||
|
||||
class TestMultimeterSimBasic:
|
||||
"""Tests for MultimeterSim without physics engine."""
|
||||
|
||||
@pytest.fixture
|
||||
def dmm(self) -> MultimeterSim:
|
||||
"""Create multimeter instance without physics engine."""
|
||||
return MultimeterSim()
|
||||
|
||||
def test_creation(self, dmm: MultimeterSim) -> None:
|
||||
"""Test multimeter can be created."""
|
||||
assert dmm is not None
|
||||
assert dmm.model == "DMM-SIM-001"
|
||||
assert dmm.manufacturer == "PyDVTATE"
|
||||
|
||||
def test_idn_query(self, dmm: MultimeterSim) -> None:
|
||||
"""Test *IDN? returns identification string."""
|
||||
response = dmm.process("*IDN?")
|
||||
|
||||
assert "PyDVTATE" in response
|
||||
assert "DMM-SIM-001" in response
|
||||
|
||||
def test_rst_command(self, dmm: MultimeterSim) -> None:
|
||||
"""Test *RST resets to defaults."""
|
||||
# Set non-default config
|
||||
dmm.process("CONF:CURR:DC")
|
||||
assert dmm.process("CONF?") == '"CURR:DC"'
|
||||
|
||||
# Reset
|
||||
response = dmm.process("*RST")
|
||||
assert response == ""
|
||||
|
||||
# Check defaults restored
|
||||
assert dmm.process("CONF?") == '"VOLT:DC"'
|
||||
|
||||
def test_opc_query(self, dmm: MultimeterSim) -> None:
|
||||
"""Test *OPC? returns 1."""
|
||||
response = dmm.process("*OPC?")
|
||||
assert response == "1"
|
||||
|
||||
def test_unknown_command(self, dmm: MultimeterSim) -> None:
|
||||
"""Test unknown command returns error."""
|
||||
response = dmm.process("INVALID:CMD")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "Unknown command" in response
|
||||
|
||||
|
||||
class TestMultimeterMeasVoltDC:
|
||||
"""Tests for MEAS:VOLT:DC command."""
|
||||
|
||||
@pytest.fixture
|
||||
def dmm(self) -> MultimeterSim:
|
||||
"""Create multimeter instance without physics engine."""
|
||||
return MultimeterSim()
|
||||
|
||||
def test_meas_volt_dc_query_no_engine(self, dmm: MultimeterSim) -> None:
|
||||
"""Test MEAS:VOLT:DC? returns 0 without physics engine."""
|
||||
response = dmm.process("MEAS:VOLT:DC?")
|
||||
|
||||
assert response == "0.000000"
|
||||
|
||||
def test_meas_volt_dc_sets_config(self, dmm: MultimeterSim) -> None:
|
||||
"""Test MEAS:VOLT:DC? sets configuration to VOLT:DC."""
|
||||
dmm.process("CONF:CURR:DC")
|
||||
dmm.process("MEAS:VOLT:DC?")
|
||||
|
||||
assert dmm.process("CONF?") == '"VOLT:DC"'
|
||||
|
||||
def test_meas_volt_dc_as_command_fails(self, dmm: MultimeterSim) -> None:
|
||||
"""Test MEAS:VOLT:DC (without ?) returns error."""
|
||||
response = dmm.process("MEAS:VOLT:DC 1.0")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "query only" in response
|
||||
|
||||
|
||||
class TestMultimeterMeasCurrDC:
|
||||
"""Tests for MEAS:CURR:DC command."""
|
||||
|
||||
@pytest.fixture
|
||||
def dmm(self) -> MultimeterSim:
|
||||
"""Create multimeter instance without physics engine."""
|
||||
return MultimeterSim()
|
||||
|
||||
def test_meas_curr_dc_query_no_engine(self, dmm: MultimeterSim) -> None:
|
||||
"""Test MEAS:CURR:DC? returns 0 without physics engine."""
|
||||
response = dmm.process("MEAS:CURR:DC?")
|
||||
|
||||
assert response == "0.000000"
|
||||
|
||||
def test_meas_curr_dc_sets_config(self, dmm: MultimeterSim) -> None:
|
||||
"""Test MEAS:CURR:DC? sets configuration to CURR:DC."""
|
||||
dmm.process("MEAS:CURR:DC?")
|
||||
|
||||
assert dmm.process("CONF?") == '"CURR:DC"'
|
||||
|
||||
def test_meas_curr_dc_as_command_fails(self, dmm: MultimeterSim) -> None:
|
||||
"""Test MEAS:CURR:DC (without ?) returns error."""
|
||||
response = dmm.process("MEAS:CURR:DC 0.1")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "query only" in response
|
||||
|
||||
|
||||
class TestMultimeterConf:
|
||||
"""Tests for CONF commands."""
|
||||
|
||||
@pytest.fixture
|
||||
def dmm(self) -> MultimeterSim:
|
||||
"""Create multimeter instance without physics engine."""
|
||||
return MultimeterSim()
|
||||
|
||||
def test_conf_query_default(self, dmm: MultimeterSim) -> None:
|
||||
"""Test CONF? returns default configuration."""
|
||||
response = dmm.process("CONF?")
|
||||
|
||||
assert response == '"VOLT:DC"'
|
||||
|
||||
def test_conf_volt_dc(self, dmm: MultimeterSim) -> None:
|
||||
"""Test CONF:VOLT:DC sets voltage measurement mode."""
|
||||
dmm.process("CONF:CURR:DC")
|
||||
response = dmm.process("CONF:VOLT:DC")
|
||||
|
||||
assert response == ""
|
||||
assert dmm.process("CONF?") == '"VOLT:DC"'
|
||||
|
||||
def test_conf_volt_dc_as_query_fails(self, dmm: MultimeterSim) -> None:
|
||||
"""Test CONF:VOLT:DC? returns error."""
|
||||
response = dmm.process("CONF:VOLT:DC?")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "command only" in response
|
||||
|
||||
def test_conf_curr_dc(self, dmm: MultimeterSim) -> None:
|
||||
"""Test CONF:CURR:DC sets current measurement mode."""
|
||||
response = dmm.process("CONF:CURR:DC")
|
||||
|
||||
assert response == ""
|
||||
assert dmm.process("CONF?") == '"CURR:DC"'
|
||||
|
||||
def test_conf_curr_dc_as_query_fails(self, dmm: MultimeterSim) -> None:
|
||||
"""Test CONF:CURR:DC? returns error."""
|
||||
response = dmm.process("CONF:CURR:DC?")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "command only" in response
|
||||
|
||||
def test_conf_as_command_fails(self, dmm: MultimeterSim) -> None:
|
||||
"""Test CONF without subcommand returns error."""
|
||||
response = dmm.process("CONF")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
|
||||
|
||||
class TestMultimeterRead:
|
||||
"""Tests for READ command."""
|
||||
|
||||
@pytest.fixture
|
||||
def dmm(self) -> MultimeterSim:
|
||||
"""Create multimeter instance without physics engine."""
|
||||
return MultimeterSim()
|
||||
|
||||
def test_read_query_volt_mode(self, dmm: MultimeterSim) -> None:
|
||||
"""Test READ? returns voltage when configured for voltage."""
|
||||
dmm.process("CONF:VOLT:DC")
|
||||
response = dmm.process("READ?")
|
||||
|
||||
assert response == "0.000000"
|
||||
|
||||
def test_read_query_curr_mode(self, dmm: MultimeterSim) -> None:
|
||||
"""Test READ? returns current when configured for current."""
|
||||
dmm.process("CONF:CURR:DC")
|
||||
response = dmm.process("READ?")
|
||||
|
||||
assert response == "0.000000"
|
||||
|
||||
def test_read_as_command_fails(self, dmm: MultimeterSim) -> None:
|
||||
"""Test READ (without ?) returns error."""
|
||||
response = dmm.process("READ")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "query only" in response
|
||||
|
||||
|
||||
class TestMultimeterWithPhysicsEngine:
|
||||
"""Tests for MultimeterSim with physics engine integration."""
|
||||
|
||||
@pytest.fixture
|
||||
def engine(self) -> PhysicsEngine:
|
||||
"""Create physics engine instance."""
|
||||
return PhysicsEngine(update_rate_hz=100.0)
|
||||
|
||||
@pytest.fixture
|
||||
def dmm(self, engine: PhysicsEngine) -> MultimeterSim:
|
||||
"""Create multimeter instance with physics engine."""
|
||||
return MultimeterSim(physics_engine=engine)
|
||||
|
||||
def test_meas_volt_dc_returns_engine_voltage(
|
||||
self, dmm: MultimeterSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test MEAS:VOLT:DC? returns physics engine output voltage."""
|
||||
engine.set_input_voltage(5.0)
|
||||
engine.set_output_enabled(True)
|
||||
engine.step()
|
||||
|
||||
response = dmm.process("MEAS:VOLT:DC?")
|
||||
|
||||
# LDO model outputs ~3.3V nominal
|
||||
voltage = float(response)
|
||||
assert voltage > 3.0
|
||||
assert voltage < 4.0
|
||||
|
||||
def test_meas_volt_dc_returns_zero_when_disabled(
|
||||
self, dmm: MultimeterSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test MEAS:VOLT:DC? returns 0 when DUT output disabled."""
|
||||
engine.set_input_voltage(5.0)
|
||||
engine.set_output_enabled(False)
|
||||
engine.step()
|
||||
|
||||
response = dmm.process("MEAS:VOLT:DC?")
|
||||
|
||||
assert response == "0.000000"
|
||||
|
||||
def test_meas_curr_dc_returns_engine_current(
|
||||
self, dmm: MultimeterSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test MEAS:CURR:DC? returns physics engine load current."""
|
||||
engine.set_input_voltage(5.0)
|
||||
engine.set_load_current(0.1)
|
||||
engine.set_output_enabled(True)
|
||||
engine.step()
|
||||
|
||||
response = dmm.process("MEAS:CURR:DC?")
|
||||
|
||||
assert float(response) == pytest.approx(0.1, abs=0.001)
|
||||
|
||||
def test_meas_curr_dc_returns_zero_when_disabled(
|
||||
self, dmm: MultimeterSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test MEAS:CURR:DC? returns 0 when DUT output disabled."""
|
||||
engine.set_input_voltage(5.0)
|
||||
engine.set_load_current(0.1)
|
||||
engine.set_output_enabled(False)
|
||||
engine.step()
|
||||
|
||||
response = dmm.process("MEAS:CURR:DC?")
|
||||
|
||||
assert response == "0.000000"
|
||||
|
||||
def test_read_uses_configured_function(
|
||||
self, dmm: MultimeterSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test READ? respects configured measurement function."""
|
||||
engine.set_input_voltage(5.0)
|
||||
engine.set_load_current(0.1)
|
||||
engine.set_output_enabled(True)
|
||||
engine.step()
|
||||
|
||||
# Configure for current
|
||||
dmm.process("CONF:CURR:DC")
|
||||
response = dmm.process("READ?")
|
||||
|
||||
# Should return current, not voltage
|
||||
assert float(response) == pytest.approx(0.1, abs=0.001)
|
||||
|
||||
def test_reset_restores_voltage_mode(
|
||||
self, dmm: MultimeterSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test *RST restores default voltage measurement mode."""
|
||||
dmm.process("CONF:CURR:DC")
|
||||
dmm.process("*RST")
|
||||
|
||||
assert dmm.process("CONF?") == '"VOLT:DC"'
|
||||
|
||||
def test_voltage_changes_with_temperature(
|
||||
self, dmm: MultimeterSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test measured voltage changes with DUT temperature."""
|
||||
engine.set_input_voltage(5.0)
|
||||
engine.set_output_enabled(True)
|
||||
engine.step()
|
||||
|
||||
# Measure at initial temperature
|
||||
response1 = dmm.process("MEAS:VOLT:DC?")
|
||||
v1 = float(response1)
|
||||
|
||||
# Change chamber temperature and let settle
|
||||
engine.set_chamber_setpoint(85.0)
|
||||
for _ in range(5000): # Let temperature settle somewhat
|
||||
engine.step()
|
||||
|
||||
# Measure at elevated temperature
|
||||
response2 = dmm.process("MEAS:VOLT:DC?")
|
||||
v2 = float(response2)
|
||||
|
||||
# Output voltage should have changed (LDO has tempco)
|
||||
assert v1 != v2
|
||||
352
tests/unit/test_power_supply.py
Normal file
352
tests/unit/test_power_supply.py
Normal file
@@ -0,0 +1,352 @@
|
||||
"""Unit tests for power supply simulator."""
|
||||
|
||||
import pytest
|
||||
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
|
||||
|
||||
|
||||
class TestPowerSupplySimBasic:
|
||||
"""Tests for PowerSupplySim without physics engine."""
|
||||
|
||||
@pytest.fixture
|
||||
def psu(self) -> PowerSupplySim:
|
||||
"""Create power supply instance without physics engine."""
|
||||
return PowerSupplySim()
|
||||
|
||||
def test_creation(self, psu: PowerSupplySim) -> None:
|
||||
"""Test power supply can be created."""
|
||||
assert psu is not None
|
||||
assert psu.model == "PS-SIM-001"
|
||||
assert psu.manufacturer == "PyDVTATE"
|
||||
|
||||
def test_idn_query(self, psu: PowerSupplySim) -> None:
|
||||
"""Test *IDN? returns identification string."""
|
||||
response = psu.process("*IDN?")
|
||||
|
||||
assert "PyDVTATE" in response
|
||||
assert "PS-SIM-001" in response
|
||||
|
||||
def test_rst_command(self, psu: PowerSupplySim) -> None:
|
||||
"""Test *RST resets to defaults."""
|
||||
# Set non-default values
|
||||
psu.process("VOLT 12.0")
|
||||
psu.process("CURR 2.0")
|
||||
psu.process("OUTP ON")
|
||||
|
||||
# Reset
|
||||
response = psu.process("*RST")
|
||||
assert response == ""
|
||||
|
||||
# Check defaults restored
|
||||
assert psu.process("VOLT?") == "0.000"
|
||||
assert psu.process("CURR?") == "1.000"
|
||||
assert psu.process("OUTP?") == "0"
|
||||
|
||||
def test_opc_query(self, psu: PowerSupplySim) -> None:
|
||||
"""Test *OPC? returns 1."""
|
||||
response = psu.process("*OPC?")
|
||||
assert response == "1"
|
||||
|
||||
def test_unknown_command(self, psu: PowerSupplySim) -> None:
|
||||
"""Test unknown command returns error."""
|
||||
response = psu.process("INVALID:CMD")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "Unknown command" in response
|
||||
|
||||
|
||||
class TestPowerSupplyVoltage:
|
||||
"""Tests for VOLT command."""
|
||||
|
||||
@pytest.fixture
|
||||
def psu(self) -> PowerSupplySim:
|
||||
"""Create power supply instance without physics engine."""
|
||||
return PowerSupplySim()
|
||||
|
||||
def test_volt_query_default(self, psu: PowerSupplySim) -> None:
|
||||
"""Test VOLT? returns default value."""
|
||||
response = psu.process("VOLT?")
|
||||
|
||||
assert response == "0.000"
|
||||
|
||||
def test_volt_set(self, psu: PowerSupplySim) -> None:
|
||||
"""Test VOLT sets value."""
|
||||
response = psu.process("VOLT 12.5")
|
||||
|
||||
assert response == ""
|
||||
assert psu.process("VOLT?") == "12.500"
|
||||
|
||||
def test_volt_set_decimal(self, psu: PowerSupplySim) -> None:
|
||||
"""Test VOLT accepts decimal values."""
|
||||
psu.process("VOLT 3.3")
|
||||
|
||||
assert psu.process("VOLT?") == "3.300"
|
||||
|
||||
def test_volt_set_negative_fails(self, psu: PowerSupplySim) -> None:
|
||||
"""Test VOLT rejects negative values."""
|
||||
response = psu.process("VOLT -5.0")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "negative" in response
|
||||
|
||||
def test_volt_set_invalid_value(self, psu: PowerSupplySim) -> None:
|
||||
"""Test VOLT with invalid value returns error."""
|
||||
response = psu.process("VOLT abc")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "Invalid voltage" in response
|
||||
|
||||
def test_volt_set_no_argument(self, psu: PowerSupplySim) -> None:
|
||||
"""Test VOLT without argument returns error."""
|
||||
response = psu.process("VOLT")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "requires a value" in response
|
||||
|
||||
|
||||
class TestPowerSupplyCurrent:
|
||||
"""Tests for CURR command."""
|
||||
|
||||
@pytest.fixture
|
||||
def psu(self) -> PowerSupplySim:
|
||||
"""Create power supply instance without physics engine."""
|
||||
return PowerSupplySim()
|
||||
|
||||
def test_curr_query_default(self, psu: PowerSupplySim) -> None:
|
||||
"""Test CURR? returns default value."""
|
||||
response = psu.process("CURR?")
|
||||
|
||||
assert response == "1.000"
|
||||
|
||||
def test_curr_set(self, psu: PowerSupplySim) -> None:
|
||||
"""Test CURR sets value."""
|
||||
response = psu.process("CURR 0.5")
|
||||
|
||||
assert response == ""
|
||||
assert psu.process("CURR?") == "0.500"
|
||||
|
||||
def test_curr_set_negative_fails(self, psu: PowerSupplySim) -> None:
|
||||
"""Test CURR rejects negative values."""
|
||||
response = psu.process("CURR -1.0")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "negative" in response
|
||||
|
||||
def test_curr_set_invalid_value(self, psu: PowerSupplySim) -> None:
|
||||
"""Test CURR with invalid value returns error."""
|
||||
response = psu.process("CURR xyz")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "Invalid current" in response
|
||||
|
||||
def test_curr_set_no_argument(self, psu: PowerSupplySim) -> None:
|
||||
"""Test CURR without argument returns error."""
|
||||
response = psu.process("CURR")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "requires a value" in response
|
||||
|
||||
|
||||
class TestPowerSupplyOutput:
|
||||
"""Tests for OUTP command."""
|
||||
|
||||
@pytest.fixture
|
||||
def psu(self) -> PowerSupplySim:
|
||||
"""Create power supply instance without physics engine."""
|
||||
return PowerSupplySim()
|
||||
|
||||
def test_outp_query_default(self, psu: PowerSupplySim) -> None:
|
||||
"""Test OUTP? returns default value (off)."""
|
||||
response = psu.process("OUTP?")
|
||||
|
||||
assert response == "0"
|
||||
|
||||
def test_outp_set_on(self, psu: PowerSupplySim) -> None:
|
||||
"""Test OUTP ON enables output."""
|
||||
response = psu.process("OUTP ON")
|
||||
|
||||
assert response == ""
|
||||
assert psu.process("OUTP?") == "1"
|
||||
|
||||
def test_outp_set_1(self, psu: PowerSupplySim) -> None:
|
||||
"""Test OUTP 1 enables output."""
|
||||
psu.process("OUTP 1")
|
||||
|
||||
assert psu.process("OUTP?") == "1"
|
||||
|
||||
def test_outp_set_off(self, psu: PowerSupplySim) -> None:
|
||||
"""Test OUTP OFF disables output."""
|
||||
psu.process("OUTP ON")
|
||||
psu.process("OUTP OFF")
|
||||
|
||||
assert psu.process("OUTP?") == "0"
|
||||
|
||||
def test_outp_set_0(self, psu: PowerSupplySim) -> None:
|
||||
"""Test OUTP 0 disables output."""
|
||||
psu.process("OUTP ON")
|
||||
psu.process("OUTP 0")
|
||||
|
||||
assert psu.process("OUTP?") == "0"
|
||||
|
||||
def test_outp_set_invalid(self, psu: PowerSupplySim) -> None:
|
||||
"""Test OUTP with invalid value returns error."""
|
||||
response = psu.process("OUTP MAYBE")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "Invalid output state" in response
|
||||
|
||||
def test_outp_set_no_argument(self, psu: PowerSupplySim) -> None:
|
||||
"""Test OUTP without argument returns error."""
|
||||
response = psu.process("OUTP")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "requires a value" in response
|
||||
|
||||
|
||||
class TestPowerSupplyMeasurement:
|
||||
"""Tests for MEAS commands."""
|
||||
|
||||
@pytest.fixture
|
||||
def psu(self) -> PowerSupplySim:
|
||||
"""Create power supply instance without physics engine."""
|
||||
return PowerSupplySim()
|
||||
|
||||
def test_meas_volt_when_off(self, psu: PowerSupplySim) -> None:
|
||||
"""Test MEAS:VOLT? returns 0 when output is off."""
|
||||
psu.process("VOLT 12.0")
|
||||
response = psu.process("MEAS:VOLT?")
|
||||
|
||||
assert response == "0.000"
|
||||
|
||||
def test_meas_volt_when_on_no_engine(self, psu: PowerSupplySim) -> None:
|
||||
"""Test MEAS:VOLT? returns setpoint when on without engine."""
|
||||
psu.process("VOLT 12.0")
|
||||
psu.process("OUTP ON")
|
||||
response = psu.process("MEAS:VOLT?")
|
||||
|
||||
assert response == "12.000"
|
||||
|
||||
def test_meas_volt_as_command_fails(self, psu: PowerSupplySim) -> None:
|
||||
"""Test MEAS:VOLT (without ?) returns error."""
|
||||
response = psu.process("MEAS:VOLT 5.0")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "query only" in response
|
||||
|
||||
def test_meas_curr_when_off(self, psu: PowerSupplySim) -> None:
|
||||
"""Test MEAS:CURR? returns 0 when output is off."""
|
||||
response = psu.process("MEAS:CURR?")
|
||||
|
||||
assert response == "0.000"
|
||||
|
||||
def test_meas_curr_when_on_no_engine(self, psu: PowerSupplySim) -> None:
|
||||
"""Test MEAS:CURR? returns 0 when on without engine."""
|
||||
psu.process("OUTP ON")
|
||||
response = psu.process("MEAS:CURR?")
|
||||
|
||||
assert response == "0.000"
|
||||
|
||||
def test_meas_curr_as_command_fails(self, psu: PowerSupplySim) -> None:
|
||||
"""Test MEAS:CURR (without ?) returns error."""
|
||||
response = psu.process("MEAS:CURR 0.1")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "query only" in response
|
||||
|
||||
|
||||
class TestPowerSupplyWithPhysicsEngine:
|
||||
"""Tests for PowerSupplySim with physics engine integration."""
|
||||
|
||||
@pytest.fixture
|
||||
def engine(self) -> PhysicsEngine:
|
||||
"""Create physics engine instance."""
|
||||
return PhysicsEngine(update_rate_hz=100.0)
|
||||
|
||||
@pytest.fixture
|
||||
def psu(self, engine: PhysicsEngine) -> PowerSupplySim:
|
||||
"""Create power supply instance with physics engine."""
|
||||
return PowerSupplySim(physics_engine=engine)
|
||||
|
||||
def test_outp_on_enables_engine_output(
|
||||
self, psu: PowerSupplySim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test OUTP ON enables physics engine output."""
|
||||
psu.process("VOLT 5.0")
|
||||
psu.process("OUTP ON")
|
||||
|
||||
assert engine.is_output_enabled is True
|
||||
|
||||
def test_outp_off_disables_engine_output(
|
||||
self, psu: PowerSupplySim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test OUTP OFF disables physics engine output."""
|
||||
psu.process("OUTP ON")
|
||||
psu.process("OUTP OFF")
|
||||
|
||||
assert engine.is_output_enabled is False
|
||||
|
||||
def test_volt_updates_engine_when_on(
|
||||
self, psu: PowerSupplySim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test VOLT updates engine input voltage when output is on."""
|
||||
psu.process("OUTP ON")
|
||||
psu.process("VOLT 5.0")
|
||||
|
||||
electrical = engine.get_electrical_state()
|
||||
assert electrical.input_voltage == pytest.approx(5.0)
|
||||
|
||||
def test_volt_does_not_update_engine_when_off(
|
||||
self, psu: PowerSupplySim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test VOLT does not update engine when output is off."""
|
||||
psu.process("VOLT 5.0")
|
||||
|
||||
electrical = engine.get_electrical_state()
|
||||
assert electrical.input_voltage == pytest.approx(0.0)
|
||||
|
||||
def test_meas_volt_returns_engine_voltage(
|
||||
self, psu: PowerSupplySim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test MEAS:VOLT? returns physics engine voltage."""
|
||||
psu.process("VOLT 5.0")
|
||||
psu.process("OUTP ON")
|
||||
|
||||
response = psu.process("MEAS:VOLT?")
|
||||
assert response == "5.000"
|
||||
|
||||
def test_meas_curr_returns_engine_current(
|
||||
self, psu: PowerSupplySim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test MEAS:CURR? returns total current from engine."""
|
||||
psu.process("VOLT 5.0")
|
||||
psu.process("OUTP ON")
|
||||
engine.set_load_current(0.1)
|
||||
|
||||
# Step engine to allow calculations
|
||||
engine.step()
|
||||
|
||||
response = psu.process("MEAS:CURR?")
|
||||
# Should include load current + quiescent current
|
||||
assert float(response) > 0.0
|
||||
|
||||
def test_reset_disables_engine_output(
|
||||
self, psu: PowerSupplySim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test *RST disables physics engine output."""
|
||||
psu.process("VOLT 5.0")
|
||||
psu.process("OUTP ON")
|
||||
psu.process("*RST")
|
||||
|
||||
assert engine.is_output_enabled is False
|
||||
|
||||
def test_reset_sets_engine_voltage_zero(
|
||||
self, psu: PowerSupplySim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test *RST sets physics engine voltage to zero."""
|
||||
psu.process("VOLT 5.0")
|
||||
psu.process("OUTP ON")
|
||||
psu.process("*RST")
|
||||
|
||||
electrical = engine.get_electrical_state()
|
||||
assert electrical.input_voltage == pytest.approx(0.0)
|
||||
203
tests/unit/test_scpi_parser.py
Normal file
203
tests/unit/test_scpi_parser.py
Normal file
@@ -0,0 +1,203 @@
|
||||
"""Unit tests for SCPI command parsing."""
|
||||
|
||||
import pytest
|
||||
|
||||
from py_dvt_ate.instruments.scpi import SCPICommand, SCPIParser
|
||||
|
||||
|
||||
class TestSCPICommand:
|
||||
"""Tests for the SCPICommand dataclass."""
|
||||
|
||||
def test_creation(self) -> None:
|
||||
"""Test SCPICommand can be created with valid values."""
|
||||
cmd = SCPICommand(
|
||||
header="VOLT",
|
||||
arguments=["3.3"],
|
||||
is_query=False,
|
||||
)
|
||||
|
||||
assert cmd.header == "VOLT"
|
||||
assert cmd.arguments == ["3.3"]
|
||||
assert cmd.is_query is False
|
||||
|
||||
def test_keyword_for_command(self) -> None:
|
||||
"""Test keyword property for regular command."""
|
||||
cmd = SCPICommand(header="VOLT", arguments=["3.3"], is_query=False)
|
||||
|
||||
assert cmd.keyword == "VOLT"
|
||||
|
||||
def test_keyword_for_query(self) -> None:
|
||||
"""Test keyword property strips '?' from query."""
|
||||
cmd = SCPICommand(header="VOLT?", arguments=[], is_query=True)
|
||||
|
||||
assert cmd.keyword == "VOLT"
|
||||
|
||||
def test_keyword_for_nested_command(self) -> None:
|
||||
"""Test keyword property for nested SCPI command."""
|
||||
cmd = SCPICommand(header="TEMP:SETPOINT?", arguments=[], is_query=True)
|
||||
|
||||
assert cmd.keyword == "TEMP:SETPOINT"
|
||||
|
||||
|
||||
class TestSCPIParser:
|
||||
"""Tests for the SCPIParser class."""
|
||||
|
||||
@pytest.fixture
|
||||
def parser(self) -> SCPIParser:
|
||||
"""Create parser instance for tests."""
|
||||
return SCPIParser()
|
||||
|
||||
def test_parse_simple_query(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing simple query command."""
|
||||
cmd = parser.parse("*IDN?")
|
||||
|
||||
assert cmd.header == "*IDN?"
|
||||
assert cmd.arguments == []
|
||||
assert cmd.is_query is True
|
||||
assert cmd.keyword == "*IDN"
|
||||
|
||||
def test_parse_simple_command(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing simple command without arguments."""
|
||||
cmd = parser.parse("*RST")
|
||||
|
||||
assert cmd.header == "*RST"
|
||||
assert cmd.arguments == []
|
||||
assert cmd.is_query is False
|
||||
assert cmd.keyword == "*RST"
|
||||
|
||||
def test_parse_command_with_single_argument(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing command with single numeric argument."""
|
||||
cmd = parser.parse("VOLT 3.3")
|
||||
|
||||
assert cmd.header == "VOLT"
|
||||
assert cmd.arguments == ["3.3"]
|
||||
assert cmd.is_query is False
|
||||
|
||||
def test_parse_command_with_multiple_arguments(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing command with comma-separated arguments."""
|
||||
cmd = parser.parse("CONF:VOLT:DC 10,0.001")
|
||||
|
||||
assert cmd.header == "CONF:VOLT:DC"
|
||||
assert cmd.arguments == ["10", "0.001"]
|
||||
assert cmd.is_query is False
|
||||
|
||||
def test_parse_nested_scpi_command(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing nested SCPI command hierarchy."""
|
||||
cmd = parser.parse("TEMP:SETPOINT 85.0")
|
||||
|
||||
assert cmd.header == "TEMP:SETPOINT"
|
||||
assert cmd.arguments == ["85.0"]
|
||||
assert cmd.is_query is False
|
||||
assert cmd.keyword == "TEMP:SETPOINT"
|
||||
|
||||
def test_parse_nested_scpi_query(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing nested SCPI query."""
|
||||
cmd = parser.parse("TEMP:SETPOINT?")
|
||||
|
||||
assert cmd.header == "TEMP:SETPOINT?"
|
||||
assert cmd.arguments == []
|
||||
assert cmd.is_query is True
|
||||
|
||||
def test_parse_ieee_common_commands(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing IEEE 488.2 common commands."""
|
||||
# Identity query
|
||||
cmd = parser.parse("*IDN?")
|
||||
assert cmd.is_query is True
|
||||
assert cmd.keyword == "*IDN"
|
||||
|
||||
# Reset
|
||||
cmd = parser.parse("*RST")
|
||||
assert cmd.is_query is False
|
||||
assert cmd.keyword == "*RST"
|
||||
|
||||
# Clear status
|
||||
cmd = parser.parse("*CLS")
|
||||
assert cmd.is_query is False
|
||||
assert cmd.keyword == "*CLS"
|
||||
|
||||
# Operation complete query
|
||||
cmd = parser.parse("*OPC?")
|
||||
assert cmd.is_query is True
|
||||
assert cmd.keyword == "*OPC"
|
||||
|
||||
def test_parse_strips_whitespace(self, parser: SCPIParser) -> None:
|
||||
"""Test parser strips leading and trailing whitespace."""
|
||||
cmd = parser.parse(" VOLT 3.3 ")
|
||||
|
||||
assert cmd.header == "VOLT"
|
||||
assert cmd.arguments == ["3.3"]
|
||||
|
||||
def test_parse_strips_argument_whitespace(self, parser: SCPIParser) -> None:
|
||||
"""Test parser strips whitespace from arguments."""
|
||||
cmd = parser.parse("CONF:VOLT:DC 10 , 0.001 ")
|
||||
|
||||
assert cmd.arguments == ["10", "0.001"]
|
||||
|
||||
def test_parse_empty_string(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing empty string returns empty command."""
|
||||
cmd = parser.parse("")
|
||||
|
||||
assert cmd.header == ""
|
||||
assert cmd.arguments == []
|
||||
assert cmd.is_query is False
|
||||
|
||||
def test_parse_whitespace_only(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing whitespace-only string returns empty command."""
|
||||
cmd = parser.parse(" ")
|
||||
|
||||
assert cmd.header == ""
|
||||
assert cmd.arguments == []
|
||||
assert cmd.is_query is False
|
||||
|
||||
def test_parse_output_on_off(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing output enable/disable commands."""
|
||||
cmd_on = parser.parse("OUTP ON")
|
||||
assert cmd_on.arguments == ["ON"]
|
||||
|
||||
cmd_off = parser.parse("OUTP OFF")
|
||||
assert cmd_off.arguments == ["OFF"]
|
||||
|
||||
cmd_1 = parser.parse("OUTP 1")
|
||||
assert cmd_1.arguments == ["1"]
|
||||
|
||||
cmd_0 = parser.parse("OUTP 0")
|
||||
assert cmd_0.arguments == ["0"]
|
||||
|
||||
def test_parse_channel_select(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing channel selection commands."""
|
||||
cmd = parser.parse("INST:SEL CH1")
|
||||
|
||||
assert cmd.header == "INST:SEL"
|
||||
assert cmd.arguments == ["CH1"]
|
||||
|
||||
def test_parse_measurement_query(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing measurement query commands."""
|
||||
cmd = parser.parse("MEAS:VOLT:DC?")
|
||||
|
||||
assert cmd.header == "MEAS:VOLT:DC?"
|
||||
assert cmd.is_query is True
|
||||
assert cmd.keyword == "MEAS:VOLT:DC"
|
||||
|
||||
def test_parse_measurement_with_range(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing measurement query with range argument."""
|
||||
cmd = parser.parse("MEAS:VOLT:DC? AUTO")
|
||||
|
||||
assert cmd.header == "MEAS:VOLT:DC?"
|
||||
assert cmd.arguments == ["AUTO"]
|
||||
assert cmd.is_query is True
|
||||
|
||||
def test_parse_system_error_query(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing system error query."""
|
||||
cmd = parser.parse("SYST:ERR?")
|
||||
|
||||
assert cmd.header == "SYST:ERR?"
|
||||
assert cmd.is_query is True
|
||||
assert cmd.keyword == "SYST:ERR"
|
||||
|
||||
def test_parse_nplc_setting(self, parser: SCPIParser) -> None:
|
||||
"""Test parsing NPLC (integration time) command."""
|
||||
cmd = parser.parse("SENS:VOLT:DC:NPLC 10")
|
||||
|
||||
assert cmd.header == "SENS:VOLT:DC:NPLC"
|
||||
assert cmd.arguments == ["10"]
|
||||
assert cmd.is_query is False
|
||||
215
tests/unit/test_thermal_chamber.py
Normal file
215
tests/unit/test_thermal_chamber.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""Unit tests for thermal chamber simulator."""
|
||||
|
||||
import pytest
|
||||
|
||||
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
||||
|
||||
|
||||
class TestThermalChamberSimBasic:
|
||||
"""Tests for ThermalChamberSim without physics engine."""
|
||||
|
||||
@pytest.fixture
|
||||
def chamber(self) -> ThermalChamberSim:
|
||||
"""Create chamber instance without physics engine."""
|
||||
return ThermalChamberSim()
|
||||
|
||||
def test_creation(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test chamber can be created."""
|
||||
assert chamber is not None
|
||||
assert chamber.model == "TC-SIM-001"
|
||||
assert chamber.manufacturer == "PyDVTATE"
|
||||
|
||||
def test_idn_query(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test *IDN? returns identification string."""
|
||||
response = chamber.process("*IDN?")
|
||||
|
||||
assert "PyDVTATE" in response
|
||||
assert "TC-SIM-001" in response
|
||||
|
||||
def test_rst_command(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test *RST resets to defaults."""
|
||||
# Set non-default value
|
||||
chamber.process("TEMP:SETPOINT 85.0")
|
||||
assert chamber.process("TEMP:SETPOINT?") == "85.00"
|
||||
|
||||
# Reset
|
||||
response = chamber.process("*RST")
|
||||
assert response == ""
|
||||
assert chamber.process("TEMP:SETPOINT?") == "25.00"
|
||||
|
||||
def test_opc_query(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test *OPC? returns 1."""
|
||||
response = chamber.process("*OPC?")
|
||||
assert response == "1"
|
||||
|
||||
def test_unknown_command(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test unknown command returns error."""
|
||||
response = chamber.process("INVALID:CMD")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "Unknown command" in response
|
||||
|
||||
|
||||
class TestThermalChamberSetpoint:
|
||||
"""Tests for TEMP:SETPOINT command."""
|
||||
|
||||
@pytest.fixture
|
||||
def chamber(self) -> ThermalChamberSim:
|
||||
"""Create chamber instance without physics engine."""
|
||||
return ThermalChamberSim()
|
||||
|
||||
def test_setpoint_query_default(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:SETPOINT? returns default value."""
|
||||
response = chamber.process("TEMP:SETPOINT?")
|
||||
|
||||
assert response == "25.00"
|
||||
|
||||
def test_setpoint_set(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:SETPOINT sets value."""
|
||||
response = chamber.process("TEMP:SETPOINT 85.0")
|
||||
|
||||
assert response == ""
|
||||
assert chamber.process("TEMP:SETPOINT?") == "85.00"
|
||||
|
||||
def test_setpoint_set_negative(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:SETPOINT accepts negative values."""
|
||||
chamber.process("TEMP:SETPOINT -40.0")
|
||||
|
||||
assert chamber.process("TEMP:SETPOINT?") == "-40.00"
|
||||
|
||||
def test_setpoint_set_invalid_value(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:SETPOINT with invalid value returns error."""
|
||||
response = chamber.process("TEMP:SETPOINT abc")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "Invalid temperature" in response
|
||||
|
||||
def test_setpoint_set_no_argument(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:SETPOINT without argument returns error."""
|
||||
response = chamber.process("TEMP:SETPOINT")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "requires a value" in response
|
||||
|
||||
|
||||
class TestThermalChamberActual:
|
||||
"""Tests for TEMP:ACTUAL? query."""
|
||||
|
||||
@pytest.fixture
|
||||
def chamber(self) -> ThermalChamberSim:
|
||||
"""Create chamber instance without physics engine."""
|
||||
return ThermalChamberSim()
|
||||
|
||||
def test_actual_query_without_engine(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:ACTUAL? returns setpoint when no physics engine."""
|
||||
chamber.process("TEMP:SETPOINT 50.0")
|
||||
response = chamber.process("TEMP:ACTUAL?")
|
||||
|
||||
# Without physics engine, returns setpoint
|
||||
assert response == "50.00"
|
||||
|
||||
def test_actual_as_command_fails(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:ACTUAL (without ?) returns error."""
|
||||
response = chamber.process("TEMP:ACTUAL 25.0")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "query only" in response
|
||||
|
||||
|
||||
class TestThermalChamberStability:
|
||||
"""Tests for TEMP:STAB? query."""
|
||||
|
||||
@pytest.fixture
|
||||
def chamber(self) -> ThermalChamberSim:
|
||||
"""Create chamber instance without physics engine."""
|
||||
return ThermalChamberSim()
|
||||
|
||||
def test_stab_query_without_engine(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:STAB? returns 1 when no physics engine."""
|
||||
response = chamber.process("TEMP:STAB?")
|
||||
|
||||
# Without physics engine, assume stable
|
||||
assert response == "1"
|
||||
|
||||
def test_stab_as_command_fails(self, chamber: ThermalChamberSim) -> None:
|
||||
"""Test TEMP:STAB (without ?) returns error."""
|
||||
response = chamber.process("TEMP:STAB 1")
|
||||
|
||||
assert response.startswith("ERROR:")
|
||||
assert "query only" in response
|
||||
|
||||
|
||||
class TestThermalChamberWithPhysicsEngine:
|
||||
"""Tests for ThermalChamberSim with physics engine integration."""
|
||||
|
||||
@pytest.fixture
|
||||
def engine(self) -> PhysicsEngine:
|
||||
"""Create physics engine instance."""
|
||||
return PhysicsEngine(update_rate_hz=100.0)
|
||||
|
||||
@pytest.fixture
|
||||
def chamber(self, engine: PhysicsEngine) -> ThermalChamberSim:
|
||||
"""Create chamber instance with physics engine."""
|
||||
return ThermalChamberSim(physics_engine=engine)
|
||||
|
||||
def test_setpoint_updates_engine(
|
||||
self, chamber: ThermalChamberSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test TEMP:SETPOINT updates physics engine."""
|
||||
chamber.process("TEMP:SETPOINT 85.0")
|
||||
|
||||
# Step the engine and check thermal state
|
||||
thermal = engine.get_thermal_state()
|
||||
# Initial chamber temp is 25, will start moving towards 85
|
||||
assert thermal.chamber_temperature == pytest.approx(25.0, abs=0.1)
|
||||
|
||||
# After many steps, should approach setpoint
|
||||
for _ in range(10000): # 100 seconds at 100Hz
|
||||
engine.step()
|
||||
|
||||
thermal = engine.get_thermal_state()
|
||||
# Should be closer to setpoint (but not quite there due to time constant)
|
||||
assert thermal.chamber_temperature > 80.0
|
||||
|
||||
def test_actual_query_returns_engine_temperature(
|
||||
self, chamber: ThermalChamberSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test TEMP:ACTUAL? returns physics engine temperature."""
|
||||
response = chamber.process("TEMP:ACTUAL?")
|
||||
|
||||
# Should match initial chamber temperature
|
||||
assert response == "25.00"
|
||||
|
||||
def test_stability_when_at_setpoint(
|
||||
self, chamber: ThermalChamberSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test TEMP:STAB? returns 1 when at setpoint."""
|
||||
# Default setpoint is 25, engine starts at 25
|
||||
response = chamber.process("TEMP:STAB?")
|
||||
|
||||
assert response == "1"
|
||||
|
||||
def test_stability_when_settling(
|
||||
self, chamber: ThermalChamberSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test TEMP:STAB? returns 0 when settling."""
|
||||
# Set new setpoint far from current temperature
|
||||
chamber.process("TEMP:SETPOINT 85.0")
|
||||
|
||||
# Step once to ensure engine updates
|
||||
engine.step()
|
||||
|
||||
# Should not be stable yet
|
||||
response = chamber.process("TEMP:STAB?")
|
||||
assert response == "0"
|
||||
|
||||
def test_reset_updates_engine(
|
||||
self, chamber: ThermalChamberSim, engine: PhysicsEngine
|
||||
) -> None:
|
||||
"""Test *RST resets both chamber and engine setpoint."""
|
||||
chamber.process("TEMP:SETPOINT 85.0")
|
||||
chamber.process("*RST")
|
||||
|
||||
# Check setpoint is back to default
|
||||
assert chamber.process("TEMP:SETPOINT?") == "25.00"
|
||||
Reference in New Issue
Block a user