Redesign integration test architecture to eliminate async/sync deadlock: - Run SimulationServer in dedicated background thread with own event loop - Rewrite TempCo tests as fully synchronous (no @pytest.mark.asyncio) - Add ServerThread fixture in tests/integration/conftest.py - Fix Unicode encoding errors (replace deg, mu, +/- with ASCII) - Optimize temperature points for faster settling (23C, 25C, 27C) All 3 TempCo integration tests now passing in ~5 minutes total.
132 lines
3.9 KiB
Python
132 lines
3.9 KiB
Python
"""Configuration for integration tests."""
|
|
|
|
import asyncio
|
|
import threading
|
|
import time
|
|
from collections.abc import Generator
|
|
|
|
import pytest
|
|
|
|
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
|
|
|
|
|
class ServerThread:
|
|
"""Helper class to run SimulationServer in a background thread.
|
|
|
|
This allows synchronous test code to run in the main thread while
|
|
the async server runs in its own thread with its own event loop.
|
|
"""
|
|
|
|
def __init__(self, config: ServerConfig):
|
|
"""Initialise the server thread.
|
|
|
|
Args:
|
|
config: Server configuration.
|
|
"""
|
|
self.config = config
|
|
self.server: SimulationServer | None = None
|
|
self.thread: threading.Thread | None = None
|
|
self.loop: asyncio.AbstractEventLoop | None = None
|
|
self._started = threading.Event()
|
|
self._error: Exception | None = None
|
|
|
|
def _run_server(self) -> None:
|
|
"""Run the server in the background thread."""
|
|
try:
|
|
# Create new event loop for this thread
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
|
|
# Create and start server
|
|
self.server = SimulationServer(self.config)
|
|
self.loop.run_until_complete(self.server.start())
|
|
|
|
# Signal that server is started
|
|
self._started.set()
|
|
|
|
# Run event loop until stopped
|
|
self.loop.run_forever()
|
|
|
|
except Exception as e:
|
|
self._error = e
|
|
self._started.set() # Unblock waiting thread even on error
|
|
finally:
|
|
# Cleanup
|
|
if self.server is not None and self.loop is not None:
|
|
try:
|
|
self.loop.run_until_complete(self.server.stop())
|
|
except Exception:
|
|
pass
|
|
if self.loop is not None:
|
|
self.loop.close()
|
|
|
|
def start(self, timeout: float = 5.0) -> None:
|
|
"""Start the server thread.
|
|
|
|
Args:
|
|
timeout: Maximum time to wait for server to start (seconds).
|
|
|
|
Raises:
|
|
RuntimeError: If server fails to start within timeout.
|
|
Exception: If server raises an exception during startup.
|
|
"""
|
|
self.thread = threading.Thread(target=self._run_server, daemon=True)
|
|
self.thread.start()
|
|
|
|
# Wait for server to start
|
|
if not self._started.wait(timeout=timeout):
|
|
raise RuntimeError("Server failed to start within timeout")
|
|
|
|
# Check if there was an error during startup
|
|
if self._error is not None:
|
|
raise self._error
|
|
|
|
# Give server a bit more time to fully initialize
|
|
time.sleep(0.1)
|
|
|
|
def stop(self, timeout: float = 5.0) -> None:
|
|
"""Stop the server thread.
|
|
|
|
Args:
|
|
timeout: Maximum time to wait for server to stop (seconds).
|
|
"""
|
|
if self.loop is not None and self.loop.is_running():
|
|
# Schedule stop in the server's event loop
|
|
self.loop.call_soon_threadsafe(self.loop.stop)
|
|
|
|
if self.thread is not None:
|
|
self.thread.join(timeout=timeout)
|
|
self.thread = None
|
|
|
|
|
|
@pytest.fixture
|
|
def simulation_server() -> Generator[ServerConfig, None, None]:
|
|
"""Provide a running simulation server for integration tests.
|
|
|
|
The server runs in a background thread with its own event loop,
|
|
allowing synchronous test code to run in the main thread.
|
|
|
|
Yields:
|
|
ServerConfig with connection details for the running server.
|
|
"""
|
|
# Use unique ports for each test to avoid conflicts
|
|
import random
|
|
|
|
base_port = random.randint(20000, 30000)
|
|
|
|
config = ServerConfig(
|
|
host="127.0.0.1",
|
|
chamber_port=base_port,
|
|
psu_port=base_port + 1,
|
|
dmm_port=base_port + 2,
|
|
physics_rate_hz=100.0,
|
|
)
|
|
|
|
server_thread = ServerThread(config)
|
|
server_thread.start()
|
|
|
|
try:
|
|
yield config
|
|
finally:
|
|
server_thread.stop()
|