Files
py-dvt-ate/tests/integration/test_tcp_server.py
Kai Chappell ba2ab9d5d8
Some checks failed
CI / Release (push) Has been cancelled
CI / Lint (push) Successful in 4s
CI / Type Check (push) Successful in 19s
CI / Test (push) Successful in 37s
Release v0.1.0
2025-12-04 13:18:17 +00:00

276 lines
8.9 KiB
Python

"""Integration tests for TCP server.
Tests the InstrumentServer and SimulationServer with actual TCP connections.
"""
from __future__ import annotations
import asyncio
import pytest
from py_dvt_ate.instruments.transport import InstrumentServer
from py_dvt_ate.simulation.physics.engine import PhysicsEngine
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.simulation.virtual.chamber import ThermalChamberSim
@pytest.mark.asyncio(loop_scope="function")
class TestInstrumentServer:
"""Tests for InstrumentServer TCP functionality."""
@pytest.fixture
def physics_engine(self) -> PhysicsEngine:
"""Create a physics engine for testing."""
return PhysicsEngine(update_rate_hz=100.0)
@pytest.fixture
def server(self, physics_engine: PhysicsEngine) -> InstrumentServer:
"""Create an instrument server with a thermal chamber."""
server = InstrumentServer(host="127.0.0.1")
chamber = ThermalChamberSim(physics_engine)
server.register_instrument(15000, chamber)
return server
async def test_server_start_stop(self, server: InstrumentServer) -> None:
"""Test server can start and stop."""
assert not server.is_running
await server.start()
assert server.is_running
await server.stop()
assert not server.is_running
async def test_client_connection(self, server: InstrumentServer) -> None:
"""Test client can connect and send command."""
await server.start()
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
# Send *IDN? query
writer.write(b"*IDN?\n")
await writer.drain()
# Read response
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
assert b"PyDVTATE" in response
assert b"TC-SIM-001" in response
writer.close()
await writer.wait_closed()
finally:
await server.stop()
async def test_multiple_commands(self, server: InstrumentServer) -> None:
"""Test sending multiple commands in sequence."""
await server.start()
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 15000)
# Set temperature setpoint
writer.write(b"TEMP:SETPOINT 85.0\n")
await writer.drain()
# Small delay to ensure server processes command before next one
await asyncio.sleep(0.01)
# Query setpoint
writer.write(b"TEMP:SETPOINT?\n")
await writer.drain()
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
assert b"85.00" in response
# Query actual temperature
writer.write(b"TEMP:ACTUAL?\n")
await writer.drain()
response = await asyncio.wait_for(reader.readline(), timeout=2.0)
# Should return a valid float
temp = float(response.decode().strip())
assert -50 <= temp <= 200
writer.close()
await writer.wait_closed()
finally:
await server.stop()
async def test_concurrent_connections(
self, physics_engine: PhysicsEngine
) -> None:
"""Test multiple concurrent client connections."""
server = InstrumentServer(host="127.0.0.1")
chamber = ThermalChamberSim(physics_engine)
server.register_instrument(15001, chamber)
await server.start()
try:
# Connect two clients simultaneously
reader1, writer1 = await asyncio.open_connection("127.0.0.1", 15001)
reader2, writer2 = await asyncio.open_connection("127.0.0.1", 15001)
# Send command from client 1
writer1.write(b"*IDN?\n")
await writer1.drain()
response1 = await asyncio.wait_for(reader1.readline(), timeout=2.0)
# Send command from client 2
writer2.write(b"*IDN?\n")
await writer2.drain()
response2 = await asyncio.wait_for(reader2.readline(), timeout=2.0)
# Both should get valid responses
assert b"TC-SIM-001" in response1
assert b"TC-SIM-001" in response2
writer1.close()
writer2.close()
await writer1.wait_closed()
await writer2.wait_closed()
finally:
await server.stop()
@pytest.mark.asyncio(loop_scope="function")
class TestSimulationServer:
"""Tests for complete SimulationServer."""
async def test_simulation_server_start_stop(self) -> None:
"""Test simulation server lifecycle."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16000,
psu_port=16001,
dmm_port=16002,
physics_rate_hz=100.0,
)
server = SimulationServer(config)
assert not server.is_running
await server.start()
assert server.is_running
assert server.physics_engine is not None
await server.stop()
assert not server.is_running
async def test_all_instruments_accessible(self) -> None:
"""Test all three instruments are accessible over TCP."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16100,
psu_port=16101,
dmm_port=16102,
)
server = SimulationServer(config)
await server.start()
try:
# Test thermal chamber
r, w = await asyncio.open_connection("127.0.0.1", 16100)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"TC-SIM-001" in resp
w.close()
await w.wait_closed()
# Test power supply
r, w = await asyncio.open_connection("127.0.0.1", 16101)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"PS-SIM-001" in resp
w.close()
await w.wait_closed()
# Test multimeter
r, w = await asyncio.open_connection("127.0.0.1", 16102)
w.write(b"*IDN?\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"DMM-SIM-001" in resp
w.close()
await w.wait_closed()
finally:
await server.stop()
async def test_physics_engine_integration(self) -> None:
"""Test instruments share physics engine state."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16200,
psu_port=16201,
dmm_port=16202,
)
server = SimulationServer(config)
await server.start()
try:
# Connect to power supply and enable output
psu_r, psu_w = await asyncio.open_connection("127.0.0.1", 16201)
psu_w.write(b"VOLT 5.0\n")
await psu_w.drain()
await asyncio.sleep(0.01) # Allow server to process
psu_w.write(b"OUTP ON\n")
await psu_w.drain()
# Run a few physics steps
await asyncio.sleep(0.1)
# Query voltage from power supply
psu_w.write(b"MEAS:VOLT?\n")
await psu_w.drain()
psu_resp = await asyncio.wait_for(psu_r.readline(), timeout=2.0)
psu_voltage = float(psu_resp.decode().strip())
# Connect to DMM and measure DUT output
dmm_r, dmm_w = await asyncio.open_connection("127.0.0.1", 16202)
dmm_w.write(b"MEAS:VOLT:DC?\n")
await dmm_w.drain()
dmm_resp = await asyncio.wait_for(dmm_r.readline(), timeout=2.0)
dmm_voltage = float(dmm_resp.decode().strip())
# PSU should show input voltage (5V)
assert 4.9 <= psu_voltage <= 5.1
# DMM should show DUT output voltage (LDO regulated ~3.3V)
assert 3.0 <= dmm_voltage <= 3.5
psu_w.close()
dmm_w.close()
await psu_w.wait_closed()
await dmm_w.wait_closed()
finally:
await server.stop()
async def test_error_handling(self) -> None:
"""Test invalid commands return errors."""
config = ServerConfig(
host="127.0.0.1",
chamber_port=16300,
psu_port=16301,
dmm_port=16302,
)
server = SimulationServer(config)
await server.start()
try:
r, w = await asyncio.open_connection("127.0.0.1", 16300)
# Send invalid command
w.write(b"INVALID:COMMAND\n")
await w.drain()
resp = await asyncio.wait_for(r.readline(), timeout=2.0)
assert b"ERROR" in resp
w.close()
await w.wait_closed()
finally:
await server.stop()