- Speed up simulation 10x for integration tests (reduces test time from 10+ minutes to ~2.5 minutes) - Add small delays between TCP commands in tests for reliability - Fix Gitea CI release job (use gitea.ref instead of github.ref) - Revert write() error-check that was consuming query responses 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
276 lines
8.9 KiB
Python
276 lines
8.9 KiB
Python
"""Integration tests for TCP server.
|
|
|
|
Tests the InstrumentServer and SimulationServer with actual TCP connections.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from py_dvt_ate.instruments.transport import InstrumentServer
|
|
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
|
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
|
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
|
|
|
|
|
@pytest.mark.asyncio(loop_scope="function")
|
|
class TestInstrumentServer:
|
|
"""Tests for InstrumentServer TCP functionality."""
|
|
|
|
@pytest.fixture
|
|
def physics_engine(self) -> PhysicsEngine:
|
|
"""Create a physics engine for testing."""
|
|
return PhysicsEngine(update_rate_hz=100.0)
|
|
|
|
@pytest.fixture
|
|
def server(self, physics_engine: PhysicsEngine) -> InstrumentServer:
|
|
"""Create an instrument server with a thermal chamber."""
|
|
server = InstrumentServer(host="127.0.0.1")
|
|
chamber = ThermalChamberSim(physics_engine)
|
|
server.register_instrument(15000, chamber)
|
|
return server
|
|
|
|
async def test_server_start_stop(self, server: InstrumentServer) -> None:
|
|
"""Test server can start and stop."""
|
|
assert not server.is_running
|
|
|
|
await server.start()
|
|
assert server.is_running
|
|
|
|
await server.stop()
|
|
assert not server.is_running
|
|
|
|
async def test_client_connection(self, server: InstrumentServer) -> None:
|
|
"""Test client can connect and send command."""
|
|
await server.start()
|
|
|
|
try:
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
|
|
|
|
# Send *IDN? query
|
|
writer.write(b"*IDN?\n")
|
|
await writer.drain()
|
|
|
|
# Read response
|
|
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
|
assert b"PyDVTATE" in response
|
|
assert b"TC-SIM-001" in response
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
await server.stop()
|
|
|
|
async def test_multiple_commands(self, server: InstrumentServer) -> None:
|
|
"""Test sending multiple commands in sequence."""
|
|
await server.start()
|
|
|
|
try:
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
|
|
|
|
# Set temperature setpoint
|
|
writer.write(b"TEMP:SETPOINT 85.0\n")
|
|
await writer.drain()
|
|
# Small delay to ensure server processes command before next one
|
|
await asyncio.sleep(0.01)
|
|
|
|
# Query setpoint
|
|
writer.write(b"TEMP:SETPOINT?\n")
|
|
await writer.drain()
|
|
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
|
assert b"85.00" in response
|
|
|
|
# Query actual temperature
|
|
writer.write(b"TEMP:ACTUAL?\n")
|
|
await writer.drain()
|
|
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
|
# Should return a valid float
|
|
temp = float(response.decode().strip())
|
|
assert -50 <= temp <= 200
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
finally:
|
|
await server.stop()
|
|
|
|
async def test_concurrent_connections(
|
|
self, physics_engine: PhysicsEngine
|
|
) -> None:
|
|
"""Test multiple concurrent client connections."""
|
|
server = InstrumentServer(host="127.0.0.1")
|
|
chamber = ThermalChamberSim(physics_engine)
|
|
server.register_instrument(15001, chamber)
|
|
|
|
await server.start()
|
|
|
|
try:
|
|
# Connect two clients simultaneously
|
|
reader1, writer1 = await asyncio.open_connection("127.0.0.1", 15001)
|
|
reader2, writer2 = await asyncio.open_connection("127.0.0.1", 15001)
|
|
|
|
# Send command from client 1
|
|
writer1.write(b"*IDN?\n")
|
|
await writer1.drain()
|
|
response1 = await asyncio.wait_for(reader1.readline(), timeout=2.0)
|
|
|
|
# Send command from client 2
|
|
writer2.write(b"*IDN?\n")
|
|
await writer2.drain()
|
|
response2 = await asyncio.wait_for(reader2.readline(), timeout=2.0)
|
|
|
|
# Both should get valid responses
|
|
assert b"TC-SIM-001" in response1
|
|
assert b"TC-SIM-001" in response2
|
|
|
|
writer1.close()
|
|
writer2.close()
|
|
await writer1.wait_closed()
|
|
await writer2.wait_closed()
|
|
finally:
|
|
await server.stop()
|
|
|
|
|
|
@pytest.mark.asyncio(loop_scope="function")
|
|
class TestSimulationServer:
|
|
"""Tests for complete SimulationServer."""
|
|
|
|
async def test_simulation_server_start_stop(self) -> None:
|
|
"""Test simulation server lifecycle."""
|
|
config = ServerConfig(
|
|
host="127.0.0.1",
|
|
chamber_port=16000,
|
|
psu_port=16001,
|
|
dmm_port=16002,
|
|
physics_rate_hz=100.0,
|
|
)
|
|
server = SimulationServer(config)
|
|
|
|
assert not server.is_running
|
|
|
|
await server.start()
|
|
assert server.is_running
|
|
assert server.physics_engine is not None
|
|
|
|
await server.stop()
|
|
assert not server.is_running
|
|
|
|
async def test_all_instruments_accessible(self) -> None:
|
|
"""Test all three instruments are accessible over TCP."""
|
|
config = ServerConfig(
|
|
host="127.0.0.1",
|
|
chamber_port=16100,
|
|
psu_port=16101,
|
|
dmm_port=16102,
|
|
)
|
|
server = SimulationServer(config)
|
|
await server.start()
|
|
|
|
try:
|
|
# Test thermal chamber
|
|
r, w = await asyncio.open_connection("127.0.0.1", 16100)
|
|
w.write(b"*IDN?\n")
|
|
await w.drain()
|
|
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
|
assert b"TC-SIM-001" in resp
|
|
w.close()
|
|
await w.wait_closed()
|
|
|
|
# Test power supply
|
|
r, w = await asyncio.open_connection("127.0.0.1", 16101)
|
|
w.write(b"*IDN?\n")
|
|
await w.drain()
|
|
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
|
assert b"PS-SIM-001" in resp
|
|
w.close()
|
|
await w.wait_closed()
|
|
|
|
# Test multimeter
|
|
r, w = await asyncio.open_connection("127.0.0.1", 16102)
|
|
w.write(b"*IDN?\n")
|
|
await w.drain()
|
|
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
|
assert b"DMM-SIM-001" in resp
|
|
w.close()
|
|
await w.wait_closed()
|
|
|
|
finally:
|
|
await server.stop()
|
|
|
|
async def test_physics_engine_integration(self) -> None:
|
|
"""Test instruments share physics engine state."""
|
|
config = ServerConfig(
|
|
host="127.0.0.1",
|
|
chamber_port=16200,
|
|
psu_port=16201,
|
|
dmm_port=16202,
|
|
)
|
|
server = SimulationServer(config)
|
|
await server.start()
|
|
|
|
try:
|
|
# Connect to power supply and enable output
|
|
psu_r, psu_w = await asyncio.open_connection("127.0.0.1", 16201)
|
|
psu_w.write(b"VOLT 5.0\n")
|
|
await psu_w.drain()
|
|
await asyncio.sleep(0.01) # Allow server to process
|
|
psu_w.write(b"OUTP ON\n")
|
|
await psu_w.drain()
|
|
|
|
# Run a few physics steps
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Query voltage from power supply
|
|
psu_w.write(b"MEAS:VOLT?\n")
|
|
await psu_w.drain()
|
|
psu_resp = await asyncio.wait_for(psu_r.readline(), timeout=2.0)
|
|
psu_voltage = float(psu_resp.decode().strip())
|
|
|
|
# Connect to DMM and measure DUT output
|
|
dmm_r, dmm_w = await asyncio.open_connection("127.0.0.1", 16202)
|
|
dmm_w.write(b"MEAS:VOLT:DC?\n")
|
|
await dmm_w.drain()
|
|
dmm_resp = await asyncio.wait_for(dmm_r.readline(), timeout=2.0)
|
|
dmm_voltage = float(dmm_resp.decode().strip())
|
|
|
|
# PSU should show input voltage (5V)
|
|
assert 4.9 <= psu_voltage <= 5.1
|
|
|
|
# DMM should show DUT output voltage (LDO regulated ~3.3V)
|
|
assert 3.0 <= dmm_voltage <= 3.5
|
|
|
|
psu_w.close()
|
|
dmm_w.close()
|
|
await psu_w.wait_closed()
|
|
await dmm_w.wait_closed()
|
|
|
|
finally:
|
|
await server.stop()
|
|
|
|
async def test_error_handling(self) -> None:
|
|
"""Test invalid commands return errors."""
|
|
config = ServerConfig(
|
|
host="127.0.0.1",
|
|
chamber_port=16300,
|
|
psu_port=16301,
|
|
dmm_port=16302,
|
|
)
|
|
server = SimulationServer(config)
|
|
await server.start()
|
|
|
|
try:
|
|
r, w = await asyncio.open_connection("127.0.0.1", 16300)
|
|
|
|
# Send invalid command
|
|
w.write(b"INVALID:COMMAND\n")
|
|
await w.drain()
|
|
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
|
assert b"ERROR" in resp
|
|
|
|
w.close()
|
|
await w.wait_closed()
|
|
|
|
finally:
|
|
await server.stop()
|