Fix TempCo integration tests with thread-based async server

Redesign integration test architecture to eliminate async/sync deadlock:
- Run SimulationServer in dedicated background thread with own event loop
- Rewrite TempCo tests as fully synchronous (no @pytest.mark.asyncio)
- Add ServerThread fixture in tests/integration/conftest.py
- Fix Unicode encoding errors (replace deg, mu, +/- with ASCII)
- Optimize temperature points for faster settling (23C, 25C, 27C)

All 3 TempCo integration tests now passing in ~5 minutes total.
This commit is contained in:
2025-10-12 17:59:48 +00:00
parent 3fdaba500d
commit 42356efce2
3 changed files with 299 additions and 216 deletions

View File

@@ -3,7 +3,7 @@
This test characterises the output voltage temperature coefficient by
sweeping the chamber temperature and measuring output voltage at each point.
The TempCo is calculated from the linear regression slope and expressed
in parts per million per degree Celsius (ppm/°C).
in parts per million per degree Celsius (ppm/C).
"""
from py_dvt_ate.data.models import TestStatus
@@ -29,12 +29,12 @@ class TempCoTest(BaseDVTTest):
5. Evaluate against specification limits
Configuration:
temperatures: List of temperature points (°C). Default: [-40, -20, 0, 25, 50, 85]
temperatures: List of temperature points (C). Default: [-40, -20, 0, 25, 50, 85]
input_voltage: DUT input voltage (V). Default: 5.0
load_current: DUT load current (A). Default: 0.1
settle_time: Additional settling time at each temp (s). Default: 5.0
num_samples: Number of measurements to average per point. Default: 5
tempco_limit: Maximum allowed TempCo magnitude (ppm/°C). Default: ±50.0
tempco_limit: Maximum allowed TempCo magnitude (ppm/C). Default: +/-50.0
"""
@property
@@ -90,7 +90,7 @@ class TempCoTest(BaseDVTTest):
# Temperature sweep
for temp_setpoint in temperatures:
context.logger.log_event(
f"Temperature point: {temp_setpoint}°C",
f"Temperature point: {temp_setpoint}C",
level="INFO",
)
@@ -102,7 +102,7 @@ class TempCoTest(BaseDVTTest):
)
if not stable:
context.logger.log_event(
f"Warning: Temperature did not stabilise at {temp_setpoint}°C",
f"Warning: Temperature did not stabilise at {temp_setpoint}C",
level="WARNING",
)
@@ -133,8 +133,8 @@ class TempCoTest(BaseDVTTest):
)
context.logger.log_event(
f"Measured Vout = {vout_mean:.6f}V ± {vout_std * 1e6:.1f}μV "
f"at T={actual_temp:.2f}°C",
f"Measured Vout = {vout_mean:.6f}V +/- {vout_std * 1e6:.1f}uV "
f"at T={actual_temp:.2f}C",
level="INFO",
)
@@ -146,7 +146,7 @@ class TempCoTest(BaseDVTTest):
tempco_ppm = self._calculate_tempco(temp_points, vout_points)
context.logger.log_event(
f"Calculated TempCo = {tempco_ppm:.2f} ppm/°C",
f"Calculated TempCo = {tempco_ppm:.2f} ppm/C",
level="INFO",
)
@@ -154,7 +154,7 @@ class TempCoTest(BaseDVTTest):
context.logger.log_result(
parameter="temp_co",
value=tempco_ppm,
unit="ppm/°C",
unit="ppm/C",
lower_limit=-abs(tempco_limit),
upper_limit=abs(tempco_limit),
)
@@ -164,13 +164,13 @@ class TempCoTest(BaseDVTTest):
if passed:
context.logger.log_event(
f"TempCo test PASSED: {tempco_ppm:.2f} ppm/°C within ±{tempco_limit} ppm/°C",
f"TempCo test PASSED: {tempco_ppm:.2f} ppm/C within +/-{tempco_limit} ppm/C",
level="INFO",
)
return TestStatus.PASSED
else:
context.logger.log_event(
f"TempCo test FAILED: {tempco_ppm:.2f} ppm/°C exceeds ±{tempco_limit} ppm/°C",
f"TempCo test FAILED: {tempco_ppm:.2f} ppm/C exceeds +/-{tempco_limit} ppm/C",
level="ERROR",
)
return TestStatus.FAILED
@@ -198,14 +198,14 @@ class TempCoTest(BaseDVTTest):
"""Calculate temperature coefficient from measurements.
Uses linear regression to find the slope (dV/dT), then converts
to ppm/°C relative to the nominal voltage (voltage at median temperature).
to ppm/C relative to the nominal voltage (voltage at median temperature).
Args:
temperatures: Temperature measurements in °C.
temperatures: Temperature measurements in C.
voltages: Output voltage measurements in V.
Returns:
Temperature coefficient in ppm/°C.
Temperature coefficient in ppm/C.
Raises:
ValueError: If insufficient data points.
@@ -230,14 +230,14 @@ class TempCoTest(BaseDVTTest):
if var_t == 0:
raise ValueError("Temperature variance is zero (all temps identical)")
slope = cov / var_t # dV/dT in V/°C
slope = cov / var_t # dV/dT in V/C
# Find nominal voltage (voltage at median temperature)
sorted_pairs = sorted(zip(temperatures, voltages, strict=True))
mid_idx = len(sorted_pairs) // 2
v_nominal = sorted_pairs[mid_idx][1]
# Convert to ppm/°C: (dV/dT) / V_nom * 10^6
# Convert to ppm/C: (dV/dT) / V_nom * 10^6
tempco_ppm = (slope / v_nominal) * 1e6
return tempco_ppm

View File

@@ -1 +1,131 @@
"""Configuration for integration tests."""
import asyncio
import threading
import time
from collections.abc import Generator
import pytest
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
class ServerThread:
"""Helper class to run SimulationServer in a background thread.
This allows synchronous test code to run in the main thread while
the async server runs in its own thread with its own event loop.
"""
def __init__(self, config: ServerConfig):
"""Initialise the server thread.
Args:
config: Server configuration.
"""
self.config = config
self.server: SimulationServer | None = None
self.thread: threading.Thread | None = None
self.loop: asyncio.AbstractEventLoop | None = None
self._started = threading.Event()
self._error: Exception | None = None
def _run_server(self) -> None:
"""Run the server in the background thread."""
try:
# Create new event loop for this thread
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# Create and start server
self.server = SimulationServer(self.config)
self.loop.run_until_complete(self.server.start())
# Signal that server is started
self._started.set()
# Run event loop until stopped
self.loop.run_forever()
except Exception as e:
self._error = e
self._started.set() # Unblock waiting thread even on error
finally:
# Cleanup
if self.server is not None and self.loop is not None:
try:
self.loop.run_until_complete(self.server.stop())
except Exception:
pass
if self.loop is not None:
self.loop.close()
def start(self, timeout: float = 5.0) -> None:
"""Start the server thread.
Args:
timeout: Maximum time to wait for server to start (seconds).
Raises:
RuntimeError: If server fails to start within timeout.
Exception: If server raises an exception during startup.
"""
self.thread = threading.Thread(target=self._run_server, daemon=True)
self.thread.start()
# Wait for server to start
if not self._started.wait(timeout=timeout):
raise RuntimeError("Server failed to start within timeout")
# Check if there was an error during startup
if self._error is not None:
raise self._error
# Give server a bit more time to fully initialize
time.sleep(0.1)
def stop(self, timeout: float = 5.0) -> None:
"""Stop the server thread.
Args:
timeout: Maximum time to wait for server to stop (seconds).
"""
if self.loop is not None and self.loop.is_running():
# Schedule stop in the server's event loop
self.loop.call_soon_threadsafe(self.loop.stop)
if self.thread is not None:
self.thread.join(timeout=timeout)
self.thread = None
@pytest.fixture
def simulation_server() -> Generator[ServerConfig, None, None]:
"""Provide a running simulation server for integration tests.
The server runs in a background thread with its own event loop,
allowing synchronous test code to run in the main thread.
Yields:
ServerConfig with connection details for the running server.
"""
# Use unique ports for each test to avoid conflicts
import random
base_port = random.randint(20000, 30000)
config = ServerConfig(
host="127.0.0.1",
chamber_port=base_port,
psu_port=base_port + 1,
dmm_port=base_port + 2,
physics_rate_hz=100.0,
)
server_thread = ServerThread(config)
server_thread.start()
try:
yield config
finally:
server_thread.stop()

View File

@@ -7,47 +7,29 @@ from __future__ import annotations
from pathlib import Path
import pytest
from py_dvt_ate.data.models import TestStatus
from py_dvt_ate.data.repository import SQLiteRepository
from py_dvt_ate.framework.context import TestContext
from py_dvt_ate.framework.logger import TestLogger
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.simulation.server import ServerConfig
from py_dvt_ate.tests.thermal.tempco import TempCoTest
@pytest.mark.asyncio(loop_scope="function")
class TestTempCoIntegration:
"""Integration tests for TempCo test with simulator."""
async def test_tempco_runs_successfully(self, tmp_path: Path) -> None:
def test_tempco_runs_successfully(
self, tmp_path: Path, simulation_server: ServerConfig
) -> None:
"""Test TempCo test runs end-to-end with simulator."""
import asyncio
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17000,
psu_port=17001,
dmm_port=17002,
physics_rate_hz=100.0,
)
server = SimulationServer(server_config)
await server.start()
# Give server time to fully initialize
await asyncio.sleep(0.1)
try:
# Create instrument set connected to simulator
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17000,
psu_port=17001,
dmm_port=17002,
simulator_host=simulation_server.host,
chamber_port=simulation_server.chamber_port,
psu_port=simulation_server.psu_port,
dmm_port=simulation_server.dmm_port,
)
instruments = InstrumentFactory.create(instrument_config)
@@ -59,10 +41,10 @@ class TestTempCoIntegration:
run_id = repository.create_run(
test_name="tempco",
config={
"temperatures": [0.0, 25.0, 50.0], # Reduced for faster test
"temperatures": [23.0, 25.0, 27.0], # Close to start temp for fast settling
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5, # Reduced for faster test
"settle_time": 0.2, # Short since temps close to start
"num_samples": 3, # Reduced for faster test
"tempco_limit": 100.0, # Relaxed for testing
},
@@ -78,10 +60,10 @@ class TestTempCoIntegration:
instruments=instruments,
logger=logger,
config={
"temperatures": [0.0, 25.0, 50.0],
"temperatures": [23.0, 25.0, 27.0],
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5,
"settle_time": 0.2,
"num_samples": 3,
"tempco_limit": 100.0,
},
@@ -92,23 +74,18 @@ class TestTempCoIntegration:
assert test.name == "tempco"
assert test.description == "Output voltage temperature coefficient"
# Run synchronous test code in thread pool to avoid blocking event loop
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
instruments.chamber.connect() # type: ignore[attr-defined]
instruments.psu.connect() # type: ignore[attr-defined]
instruments.dmm.connect() # type: ignore[attr-defined]
try:
# Configure instruments
instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing
instruments.psu.enable_output(1, False) # Ensure off initially
# Run test
return test.execute(context)
status = await loop.run_in_executor(None, run_test)
status = test.execute(context)
# Verify test completed
assert status in (TestStatus.PASSED, TestStatus.FAILED)
@@ -126,7 +103,7 @@ class TestTempCoIntegration:
# Find TempCo result
tempco_result = next(r for r in results if r.parameter == "temp_co")
assert tempco_result is not None
assert tempco_result.unit == "ppm/°C"
assert tempco_result.unit == "ppm/C"
assert tempco_result.lower_limit == -100.0
assert tempco_result.upper_limit == 100.0
@@ -145,33 +122,22 @@ class TestTempCoIntegration:
assert len(temps_recorded) >= 3
finally:
await server.stop()
# Disconnect from instruments
instruments.chamber.disconnect() # type: ignore[attr-defined]
instruments.psu.disconnect() # type: ignore[attr-defined]
instruments.dmm.disconnect() # type: ignore[attr-defined]
async def test_tempco_with_minimal_config(self, tmp_path: Path) -> None:
def test_tempco_with_minimal_config(
self, tmp_path: Path, simulation_server: ServerConfig
) -> None:
"""Test TempCo uses default configuration when not specified."""
import asyncio
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17100,
psu_port=17101,
dmm_port=17102,
)
server = SimulationServer(server_config)
await server.start()
# Give server time to fully initialize
await asyncio.sleep(0.1)
try:
# Create instrument set
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17100,
psu_port=17101,
dmm_port=17102,
simulator_host=simulation_server.host,
chamber_port=simulation_server.chamber_port,
psu_port=simulation_server.psu_port,
dmm_port=simulation_server.dmm_port,
)
instruments = InstrumentFactory.create(instrument_config)
@@ -191,25 +157,23 @@ class TestTempCoIntegration:
logger=logger,
config={
# Override temperatures for faster test
"temperatures": [25.0, 50.0],
"temperatures": [24.0, 26.0],
"settle_time": 0.2,
"num_samples": 2,
},
)
# Execute test in thread pool
# Execute test
test = TempCoTest()
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Run test
return test.execute(context)
instruments.chamber.connect() # type: ignore[attr-defined]
instruments.psu.connect() # type: ignore[attr-defined]
instruments.dmm.connect() # type: ignore[attr-defined]
status = await loop.run_in_executor(None, run_test)
try:
# Run test
status = test.execute(context)
# Should complete without error
assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR)
@@ -222,33 +186,22 @@ class TestTempCoIntegration:
assert len(results) >= 1
finally:
await server.stop()
# Disconnect from instruments
instruments.chamber.disconnect() # type: ignore[attr-defined]
instruments.psu.disconnect() # type: ignore[attr-defined]
instruments.dmm.disconnect() # type: ignore[attr-defined]
async def test_tempco_handles_errors_gracefully(self, tmp_path: Path) -> None:
def test_tempco_handles_errors_gracefully(
self, tmp_path: Path, simulation_server: ServerConfig
) -> None:
"""Test TempCo returns ERROR status when instruments fail."""
import asyncio
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17200,
psu_port=17201,
dmm_port=17202,
)
server = SimulationServer(server_config)
await server.start()
# Give server time to fully initialize
await asyncio.sleep(0.1)
try:
# Create instrument set
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17200,
psu_port=17201,
dmm_port=17202,
simulator_host=simulation_server.host,
chamber_port=simulation_server.chamber_port,
psu_port=simulation_server.psu_port,
dmm_port=simulation_server.dmm_port,
)
instruments = InstrumentFactory.create(instrument_config)
@@ -269,22 +222,19 @@ class TestTempCoIntegration:
},
)
# Execute test in thread pool
# Execute test
test = TempCoTest()
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Run test
return test.execute(context)
instruments.chamber.connect() # type: ignore[attr-defined]
instruments.psu.connect() # type: ignore[attr-defined]
instruments.dmm.connect() # type: ignore[attr-defined]
try:
# Should handle gracefully (may return FAILED or ERROR)
# The test should not raise an unhandled exception
try:
status = await loop.run_in_executor(None, run_test)
status = test.execute(context)
# If it completes, it should indicate an error or failure
assert status in (TestStatus.ERROR, TestStatus.FAILED)
except Exception:
@@ -294,4 +244,7 @@ class TestTempCoIntegration:
logger.flush()
finally:
await server.stop()
# Disconnect from instruments
instruments.chamber.disconnect() # type: ignore[attr-defined]
instruments.psu.disconnect() # type: ignore[attr-defined]
instruments.dmm.disconnect() # type: ignore[attr-defined]