Add TCP server integration tests

Test connection handling, multiple clients, instrument access across
all three virtual instruments, physics engine integration, and error
handling. Update pytest-asyncio config for v1.x compatibility.
This commit is contained in:
2025-05-30 22:59:33 +00:00
parent 2d358062f4
commit a742d57a6f
4 changed files with 290 additions and 1 deletions

View File

@@ -86,5 +86,8 @@ ignore_missing_imports = true
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = ["tests"] testpaths = ["tests"]
asyncio_mode = "auto"
addopts = "-v --tb=short" addopts = "-v --tb=short"
[tool.pytest-asyncio]
mode = "auto"
default_fixture_loop_scope = "function"

View File

@@ -0,0 +1 @@
"""Integration tests for py_dvt_ate."""

View File

@@ -0,0 +1,11 @@
"""Configuration for integration tests."""
import pytest
# Configure pytest-asyncio mode
pytest_plugins = ("pytest_asyncio",)
def pytest_configure(config: pytest.Config) -> None:
"""Configure pytest for async tests."""
config.addinivalue_line("markers", "asyncio: mark test as async")

View File

@@ -0,0 +1,274 @@
"""Integration tests for TCP server.
Tests the InstrumentServer and SimulationServer with actual TCP connections.
"""
from __future__ import annotations
import asyncio
import pytest
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.simulation.tcp_server import InstrumentServer
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
@pytest.mark.asyncio(loop_scope="function")
class TestInstrumentServer:
"""Tests for InstrumentServer TCP functionality."""
@pytest.fixture
def physics_engine(self) -> PhysicsEngine:
"""Create a physics engine for testing."""
return PhysicsEngine(update_rate_hz=100.0)
@pytest.fixture
def server(self, physics_engine: PhysicsEngine) -> InstrumentServer:
"""Create an instrument server with a thermal chamber."""
server = InstrumentServer(host="127.0.0.1")
chamber = ThermalChamberSim(physics_engine)
server.register_instrument(15000, chamber)
return server
async def test_server_start_stop(self, server: InstrumentServer) -> None:
"""Test server can start and stop."""
assert not server.is_running
await server.start()
assert server.is_running
await server.stop()
assert not server.is_running
async def test_client_connection(self, server: InstrumentServer) -> None:
"""Test client can connect and send command."""
await server.start()
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
# Send *IDN? query
writer.write(b"*IDN?\n")
await writer.drain()
# Read response
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
assert b"PyDVTATE" in response
assert b"TC-SIM-001" in response
writer.close()
await writer.wait_closed()
finally:
await server.stop()
async def test_multiple_commands(self, server: InstrumentServer) -> None:
"""Test sending multiple commands in sequence."""
await server.start()
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
# Set temperature setpoint
writer.write(b"TEMP:SETPOINT 85.0\n")
await writer.drain()
# Query setpoint
writer.write(b"TEMP:SETPOINT?\n")
await writer.drain()
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
assert b"85.00" in response
# Query actual temperature
writer.write(b"TEMP:ACTUAL?\n")
await writer.drain()
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
# Should return a valid float
temp = float(response.decode().strip())
assert -50 <= temp <= 200
writer.close()
await writer.wait_closed()
finally:
await server.stop()
async def test_concurrent_connections(
self, physics_engine: PhysicsEngine
) -> None:
"""Test multiple concurrent client connections."""
server = InstrumentServer(host="127.0.0.1")
chamber = ThermalChamberSim(physics_engine)
server.register_instrument(15001, chamber)
await server.start()
try:
# Connect two clients simultaneously
reader1, writer1 = await asyncio.open_connection("127.0.0.1", 15001)
reader2, writer2 = await asyncio.open_connection("127.0.0.1", 15001)
# Send command from client 1
writer1.write(b"*IDN?\n")
await writer1.drain()
response1 = await asyncio.wait_for(reader1.readline(), timeout=2.0)
# Send command from client 2
writer2.write(b"*IDN?\n")
await writer2.drain()
response2 = await asyncio.wait_for(reader2.readline(), timeout=2.0)
# Both should get valid responses
assert b"TC-SIM-001" in response1
assert b"TC-SIM-001" in response2
writer1.close()
writer2.close()
await writer1.wait_closed()
await writer2.wait_closed()
finally:
await server.stop()
@pytest.mark.asyncio(loop_scope="function")
class TestSimulationServer:
"""Tests for complete SimulationServer."""
async def test_simulation_server_start_stop(self) -> None:
"""Test simulation server lifecycle."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16000,
psu_port=16001,
dmm_port=16002,
physics_rate_hz=100.0,
)
server = SimulationServer(config)
assert not server.is_running
await server.start()
assert server.is_running
assert server.physics_engine is not None
await server.stop()
assert not server.is_running
async def test_all_instruments_accessible(self) -> None:
"""Test all three instruments are accessible over TCP."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16100,
psu_port=16101,
dmm_port=16102,
)
server = SimulationServer(config)
await server.start()
try:
# Test thermal chamber
r, w = await asyncio.open_connection("127.0.0.1", 16100)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"TC-SIM-001" in resp
w.close()
await w.wait_closed()
# Test power supply
r, w = await asyncio.open_connection("127.0.0.1", 16101)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"PS-SIM-001" in resp
w.close()
await w.wait_closed()
# Test multimeter
r, w = await asyncio.open_connection("127.0.0.1", 16102)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"DMM-SIM-001" in resp
w.close()
await w.wait_closed()
finally:
await server.stop()
async def test_physics_engine_integration(self) -> None:
"""Test instruments share physics engine state."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16200,
psu_port=16201,
dmm_port=16202,
)
server = SimulationServer(config)
await server.start()
try:
# Connect to power supply and enable output
psu_r, psu_w = await asyncio.open_connection("127.0.0.1", 16201)
psu_w.write(b"VOLT 5.0\n")
await psu_w.drain()
psu_w.write(b"OUTP ON\n")
await psu_w.drain()
# Run a few physics steps
await asyncio.sleep(0.1)
# Query voltage from power supply
psu_w.write(b"MEAS:VOLT?\n")
await psu_w.drain()
psu_resp = await asyncio.wait_for(psu_r.readline(), timeout=2.0)
psu_voltage = float(psu_resp.decode().strip())
# Connect to DMM and measure DUT output
dmm_r, dmm_w = await asyncio.open_connection("127.0.0.1", 16202)
dmm_w.write(b"MEAS:VOLT:DC?\n")
await dmm_w.drain()
dmm_resp = await asyncio.wait_for(dmm_r.readline(), timeout=2.0)
dmm_voltage = float(dmm_resp.decode().strip())
# PSU should show input voltage (5V)
assert 4.9 <= psu_voltage <= 5.1
# DMM should show DUT output voltage (LDO regulated ~3.3V)
assert 3.0 <= dmm_voltage <= 3.5
psu_w.close()
dmm_w.close()
await psu_w.wait_closed()
await dmm_w.wait_closed()
finally:
await server.stop()
async def test_error_handling(self) -> None:
"""Test invalid commands return errors."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16300,
psu_port=16301,
dmm_port=16302,
)
server = SimulationServer(config)
await server.start()
try:
r, w = await asyncio.open_connection("127.0.0.1", 16300)
# Send invalid command
w.write(b"INVALID:COMMAND\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"ERROR" in resp
w.close()
await w.wait_closed()
finally:
await server.stop()