Add TCP server integration tests
Test connection handling, multiple clients, instrument access across all three virtual instruments, physics engine integration, and error handling. Update pytest-asyncio config for v1.x compatibility.
This commit is contained in:
@@ -86,5 +86,8 @@ ignore_missing_imports = true
|
|||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
testpaths = ["tests"]
|
testpaths = ["tests"]
|
||||||
asyncio_mode = "auto"
|
|
||||||
addopts = "-v --tb=short"
|
addopts = "-v --tb=short"
|
||||||
|
|
||||||
|
[tool.pytest-asyncio]
|
||||||
|
mode = "auto"
|
||||||
|
default_fixture_loop_scope = "function"
|
||||||
|
|||||||
1
tests/integration/__init__.py
Normal file
1
tests/integration/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
"""Integration tests for py_dvt_ate."""
|
||||||
11
tests/integration/conftest.py
Normal file
11
tests/integration/conftest.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
"""Configuration for integration tests."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Configure pytest-asyncio mode
|
||||||
|
pytest_plugins = ("pytest_asyncio",)
|
||||||
|
|
||||||
|
|
||||||
|
def pytest_configure(config: pytest.Config) -> None:
|
||||||
|
"""Configure pytest for async tests."""
|
||||||
|
config.addinivalue_line("markers", "asyncio: mark test as async")
|
||||||
274
tests/integration/test_tcp_server.py
Normal file
274
tests/integration/test_tcp_server.py
Normal file
@@ -0,0 +1,274 @@
|
|||||||
|
"""Integration tests for TCP server.
|
||||||
|
|
||||||
|
Tests the InstrumentServer and SimulationServer with actual TCP connections.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
|
||||||
|
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
||||||
|
from py_dvt_ate.simulation.tcp_server import InstrumentServer
|
||||||
|
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
|
||||||
|
from py_dvt_ate.simulation.virtual.multimeter import MultimeterSim
|
||||||
|
from py_dvt_ate.simulation.virtual.power_supply import PowerSupplySim
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio(loop_scope="function")
|
||||||
|
class TestInstrumentServer:
|
||||||
|
"""Tests for InstrumentServer TCP functionality."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def physics_engine(self) -> PhysicsEngine:
|
||||||
|
"""Create a physics engine for testing."""
|
||||||
|
return PhysicsEngine(update_rate_hz=100.0)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def server(self, physics_engine: PhysicsEngine) -> InstrumentServer:
|
||||||
|
"""Create an instrument server with a thermal chamber."""
|
||||||
|
server = InstrumentServer(host="127.0.0.1")
|
||||||
|
chamber = ThermalChamberSim(physics_engine)
|
||||||
|
server.register_instrument(15000, chamber)
|
||||||
|
return server
|
||||||
|
|
||||||
|
async def test_server_start_stop(self, server: InstrumentServer) -> None:
|
||||||
|
"""Test server can start and stop."""
|
||||||
|
assert not server.is_running
|
||||||
|
|
||||||
|
await server.start()
|
||||||
|
assert server.is_running
|
||||||
|
|
||||||
|
await server.stop()
|
||||||
|
assert not server.is_running
|
||||||
|
|
||||||
|
async def test_client_connection(self, server: InstrumentServer) -> None:
|
||||||
|
"""Test client can connect and send command."""
|
||||||
|
await server.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
|
||||||
|
|
||||||
|
# Send *IDN? query
|
||||||
|
writer.write(b"*IDN?\n")
|
||||||
|
await writer.drain()
|
||||||
|
|
||||||
|
# Read response
|
||||||
|
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
||||||
|
assert b"PyDVTATE" in response
|
||||||
|
assert b"TC-SIM-001" in response
|
||||||
|
|
||||||
|
writer.close()
|
||||||
|
await writer.wait_closed()
|
||||||
|
finally:
|
||||||
|
await server.stop()
|
||||||
|
|
||||||
|
async def test_multiple_commands(self, server: InstrumentServer) -> None:
|
||||||
|
"""Test sending multiple commands in sequence."""
|
||||||
|
await server.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
|
||||||
|
|
||||||
|
# Set temperature setpoint
|
||||||
|
writer.write(b"TEMP:SETPOINT 85.0\n")
|
||||||
|
await writer.drain()
|
||||||
|
|
||||||
|
# Query setpoint
|
||||||
|
writer.write(b"TEMP:SETPOINT?\n")
|
||||||
|
await writer.drain()
|
||||||
|
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
||||||
|
assert b"85.00" in response
|
||||||
|
|
||||||
|
# Query actual temperature
|
||||||
|
writer.write(b"TEMP:ACTUAL?\n")
|
||||||
|
await writer.drain()
|
||||||
|
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
|
||||||
|
# Should return a valid float
|
||||||
|
temp = float(response.decode().strip())
|
||||||
|
assert -50 <= temp <= 200
|
||||||
|
|
||||||
|
writer.close()
|
||||||
|
await writer.wait_closed()
|
||||||
|
finally:
|
||||||
|
await server.stop()
|
||||||
|
|
||||||
|
async def test_concurrent_connections(
|
||||||
|
self, physics_engine: PhysicsEngine
|
||||||
|
) -> None:
|
||||||
|
"""Test multiple concurrent client connections."""
|
||||||
|
server = InstrumentServer(host="127.0.0.1")
|
||||||
|
chamber = ThermalChamberSim(physics_engine)
|
||||||
|
server.register_instrument(15001, chamber)
|
||||||
|
|
||||||
|
await server.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Connect two clients simultaneously
|
||||||
|
reader1, writer1 = await asyncio.open_connection("127.0.0.1", 15001)
|
||||||
|
reader2, writer2 = await asyncio.open_connection("127.0.0.1", 15001)
|
||||||
|
|
||||||
|
# Send command from client 1
|
||||||
|
writer1.write(b"*IDN?\n")
|
||||||
|
await writer1.drain()
|
||||||
|
response1 = await asyncio.wait_for(reader1.readline(), timeout=2.0)
|
||||||
|
|
||||||
|
# Send command from client 2
|
||||||
|
writer2.write(b"*IDN?\n")
|
||||||
|
await writer2.drain()
|
||||||
|
response2 = await asyncio.wait_for(reader2.readline(), timeout=2.0)
|
||||||
|
|
||||||
|
# Both should get valid responses
|
||||||
|
assert b"TC-SIM-001" in response1
|
||||||
|
assert b"TC-SIM-001" in response2
|
||||||
|
|
||||||
|
writer1.close()
|
||||||
|
writer2.close()
|
||||||
|
await writer1.wait_closed()
|
||||||
|
await writer2.wait_closed()
|
||||||
|
finally:
|
||||||
|
await server.stop()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio(loop_scope="function")
|
||||||
|
class TestSimulationServer:
|
||||||
|
"""Tests for complete SimulationServer."""
|
||||||
|
|
||||||
|
async def test_simulation_server_start_stop(self) -> None:
|
||||||
|
"""Test simulation server lifecycle."""
|
||||||
|
config = ServerConfig(
|
||||||
|
host="127.0.0.1",
|
||||||
|
chamber_port=16000,
|
||||||
|
psu_port=16001,
|
||||||
|
dmm_port=16002,
|
||||||
|
physics_rate_hz=100.0,
|
||||||
|
)
|
||||||
|
server = SimulationServer(config)
|
||||||
|
|
||||||
|
assert not server.is_running
|
||||||
|
|
||||||
|
await server.start()
|
||||||
|
assert server.is_running
|
||||||
|
assert server.physics_engine is not None
|
||||||
|
|
||||||
|
await server.stop()
|
||||||
|
assert not server.is_running
|
||||||
|
|
||||||
|
async def test_all_instruments_accessible(self) -> None:
|
||||||
|
"""Test all three instruments are accessible over TCP."""
|
||||||
|
config = ServerConfig(
|
||||||
|
host="127.0.0.1",
|
||||||
|
chamber_port=16100,
|
||||||
|
psu_port=16101,
|
||||||
|
dmm_port=16102,
|
||||||
|
)
|
||||||
|
server = SimulationServer(config)
|
||||||
|
await server.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Test thermal chamber
|
||||||
|
r, w = await asyncio.open_connection("127.0.0.1", 16100)
|
||||||
|
w.write(b"*IDN?\n")
|
||||||
|
await w.drain()
|
||||||
|
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
||||||
|
assert b"TC-SIM-001" in resp
|
||||||
|
w.close()
|
||||||
|
await w.wait_closed()
|
||||||
|
|
||||||
|
# Test power supply
|
||||||
|
r, w = await asyncio.open_connection("127.0.0.1", 16101)
|
||||||
|
w.write(b"*IDN?\n")
|
||||||
|
await w.drain()
|
||||||
|
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
||||||
|
assert b"PS-SIM-001" in resp
|
||||||
|
w.close()
|
||||||
|
await w.wait_closed()
|
||||||
|
|
||||||
|
# Test multimeter
|
||||||
|
r, w = await asyncio.open_connection("127.0.0.1", 16102)
|
||||||
|
w.write(b"*IDN?\n")
|
||||||
|
await w.drain()
|
||||||
|
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
||||||
|
assert b"DMM-SIM-001" in resp
|
||||||
|
w.close()
|
||||||
|
await w.wait_closed()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
await server.stop()
|
||||||
|
|
||||||
|
async def test_physics_engine_integration(self) -> None:
|
||||||
|
"""Test instruments share physics engine state."""
|
||||||
|
config = ServerConfig(
|
||||||
|
host="127.0.0.1",
|
||||||
|
chamber_port=16200,
|
||||||
|
psu_port=16201,
|
||||||
|
dmm_port=16202,
|
||||||
|
)
|
||||||
|
server = SimulationServer(config)
|
||||||
|
await server.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Connect to power supply and enable output
|
||||||
|
psu_r, psu_w = await asyncio.open_connection("127.0.0.1", 16201)
|
||||||
|
psu_w.write(b"VOLT 5.0\n")
|
||||||
|
await psu_w.drain()
|
||||||
|
psu_w.write(b"OUTP ON\n")
|
||||||
|
await psu_w.drain()
|
||||||
|
|
||||||
|
# Run a few physics steps
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
# Query voltage from power supply
|
||||||
|
psu_w.write(b"MEAS:VOLT?\n")
|
||||||
|
await psu_w.drain()
|
||||||
|
psu_resp = await asyncio.wait_for(psu_r.readline(), timeout=2.0)
|
||||||
|
psu_voltage = float(psu_resp.decode().strip())
|
||||||
|
|
||||||
|
# Connect to DMM and measure DUT output
|
||||||
|
dmm_r, dmm_w = await asyncio.open_connection("127.0.0.1", 16202)
|
||||||
|
dmm_w.write(b"MEAS:VOLT:DC?\n")
|
||||||
|
await dmm_w.drain()
|
||||||
|
dmm_resp = await asyncio.wait_for(dmm_r.readline(), timeout=2.0)
|
||||||
|
dmm_voltage = float(dmm_resp.decode().strip())
|
||||||
|
|
||||||
|
# PSU should show input voltage (5V)
|
||||||
|
assert 4.9 <= psu_voltage <= 5.1
|
||||||
|
|
||||||
|
# DMM should show DUT output voltage (LDO regulated ~3.3V)
|
||||||
|
assert 3.0 <= dmm_voltage <= 3.5
|
||||||
|
|
||||||
|
psu_w.close()
|
||||||
|
dmm_w.close()
|
||||||
|
await psu_w.wait_closed()
|
||||||
|
await dmm_w.wait_closed()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
await server.stop()
|
||||||
|
|
||||||
|
async def test_error_handling(self) -> None:
|
||||||
|
"""Test invalid commands return errors."""
|
||||||
|
config = ServerConfig(
|
||||||
|
host="127.0.0.1",
|
||||||
|
chamber_port=16300,
|
||||||
|
psu_port=16301,
|
||||||
|
dmm_port=16302,
|
||||||
|
)
|
||||||
|
server = SimulationServer(config)
|
||||||
|
await server.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
r, w = await asyncio.open_connection("127.0.0.1", 16300)
|
||||||
|
|
||||||
|
# Send invalid command
|
||||||
|
w.write(b"INVALID:COMMAND\n")
|
||||||
|
await w.drain()
|
||||||
|
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
|
||||||
|
assert b"ERROR" in resp
|
||||||
|
|
||||||
|
w.close()
|
||||||
|
await w.wait_closed()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
await server.stop()
|
||||||
Reference in New Issue
Block a user