"""Configuration for integration tests.""" import asyncio import threading import time from collections.abc import Generator import pytest from py_dvt_ate.simulation.server import ServerConfig, SimulationServer class ServerThread: """Helper class to run SimulationServer in a background thread. This allows synchronous test code to run in the main thread while the async server runs in its own thread with its own event loop. """ def __init__(self, config: ServerConfig): """Initialise the server thread. Args: config: Server configuration. """ self.config = config self.server: SimulationServer | None = None self.thread: threading.Thread | None = None self.loop: asyncio.AbstractEventLoop | None = None self._started = threading.Event() self._error: Exception | None = None def _run_server(self) -> None: """Run the server in the background thread.""" try: # Create new event loop for this thread self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) # Create and start server self.server = SimulationServer(self.config) self.loop.run_until_complete(self.server.start()) # Signal that server is started self._started.set() # Run event loop until stopped self.loop.run_forever() except Exception as e: self._error = e self._started.set() # Unblock waiting thread even on error finally: # Cleanup if self.server is not None and self.loop is not None: try: self.loop.run_until_complete(self.server.stop()) except Exception: pass if self.loop is not None: self.loop.close() def start(self, timeout: float = 5.0) -> None: """Start the server thread. Args: timeout: Maximum time to wait for server to start (seconds). Raises: RuntimeError: If server fails to start within timeout. Exception: If server raises an exception during startup. """ self.thread = threading.Thread(target=self._run_server, daemon=True) self.thread.start() # Wait for server to start if not self._started.wait(timeout=timeout): raise RuntimeError("Server failed to start within timeout") # Check if there was an error during startup if self._error is not None: raise self._error # Give server a bit more time to fully initialize time.sleep(0.1) def stop(self, timeout: float = 5.0) -> None: """Stop the server thread. Args: timeout: Maximum time to wait for server to stop (seconds). """ if self.loop is not None and self.loop.is_running(): # Schedule stop in the server's event loop self.loop.call_soon_threadsafe(self.loop.stop) if self.thread is not None: self.thread.join(timeout=timeout) self.thread = None @pytest.fixture def simulation_server() -> Generator[ServerConfig, None, None]: """Provide a running simulation server for integration tests. The server runs in a background thread with its own event loop, allowing synchronous test code to run in the main thread. Yields: ServerConfig with connection details for the running server. """ # Use unique ports for each test to avoid conflicts import random base_port = random.randint(20000, 30000) config = ServerConfig( host="127.0.0.1", chamber_port=base_port, psu_port=base_port + 1, dmm_port=base_port + 2, physics_rate_hz=100.0, ) server_thread = ServerThread(config) server_thread.start() try: yield config finally: server_thread.stop()