Fix TempCo integration tests with thread-based async server

Redesign integration test architecture to eliminate async/sync deadlock:
- Run SimulationServer in dedicated background thread with own event loop
- Rewrite TempCo tests as fully synchronous (no @pytest.mark.asyncio)
- Add ServerThread fixture in tests/integration/conftest.py
- Fix Unicode encoding errors (replace deg, mu, +/- with ASCII)
- Optimize temperature points for faster settling (23C, 25C, 27C)

All 3 TempCo integration tests now passing in ~5 minutes total.
This commit is contained in:
2025-10-12 17:59:48 +00:00
parent 9d16480503
commit 4f6f3be7a6
3 changed files with 299 additions and 216 deletions

View File

@@ -3,7 +3,7 @@
This test characterises the output voltage temperature coefficient by This test characterises the output voltage temperature coefficient by
sweeping the chamber temperature and measuring output voltage at each point. sweeping the chamber temperature and measuring output voltage at each point.
The TempCo is calculated from the linear regression slope and expressed The TempCo is calculated from the linear regression slope and expressed
in parts per million per degree Celsius (ppm/°C). in parts per million per degree Celsius (ppm/C).
""" """
from py_dvt_ate.data.models import TestStatus from py_dvt_ate.data.models import TestStatus
@@ -29,12 +29,12 @@ class TempCoTest(BaseDVTTest):
5. Evaluate against specification limits 5. Evaluate against specification limits
Configuration: Configuration:
temperatures: List of temperature points (°C). Default: [-40, -20, 0, 25, 50, 85] temperatures: List of temperature points (C). Default: [-40, -20, 0, 25, 50, 85]
input_voltage: DUT input voltage (V). Default: 5.0 input_voltage: DUT input voltage (V). Default: 5.0
load_current: DUT load current (A). Default: 0.1 load_current: DUT load current (A). Default: 0.1
settle_time: Additional settling time at each temp (s). Default: 5.0 settle_time: Additional settling time at each temp (s). Default: 5.0
num_samples: Number of measurements to average per point. Default: 5 num_samples: Number of measurements to average per point. Default: 5
tempco_limit: Maximum allowed TempCo magnitude (ppm/°C). Default: ±50.0 tempco_limit: Maximum allowed TempCo magnitude (ppm/C). Default: +/-50.0
""" """
@property @property
@@ -90,7 +90,7 @@ class TempCoTest(BaseDVTTest):
# Temperature sweep # Temperature sweep
for temp_setpoint in temperatures: for temp_setpoint in temperatures:
context.logger.log_event( context.logger.log_event(
f"Temperature point: {temp_setpoint}°C", f"Temperature point: {temp_setpoint}C",
level="INFO", level="INFO",
) )
@@ -102,7 +102,7 @@ class TempCoTest(BaseDVTTest):
) )
if not stable: if not stable:
context.logger.log_event( context.logger.log_event(
f"Warning: Temperature did not stabilise at {temp_setpoint}°C", f"Warning: Temperature did not stabilise at {temp_setpoint}C",
level="WARNING", level="WARNING",
) )
@@ -133,8 +133,8 @@ class TempCoTest(BaseDVTTest):
) )
context.logger.log_event( context.logger.log_event(
f"Measured Vout = {vout_mean:.6f}V ± {vout_std * 1e6:.1f}μV " f"Measured Vout = {vout_mean:.6f}V +/- {vout_std * 1e6:.1f}uV "
f"at T={actual_temp:.2f}°C", f"at T={actual_temp:.2f}C",
level="INFO", level="INFO",
) )
@@ -146,7 +146,7 @@ class TempCoTest(BaseDVTTest):
tempco_ppm = self._calculate_tempco(temp_points, vout_points) tempco_ppm = self._calculate_tempco(temp_points, vout_points)
context.logger.log_event( context.logger.log_event(
f"Calculated TempCo = {tempco_ppm:.2f} ppm/°C", f"Calculated TempCo = {tempco_ppm:.2f} ppm/C",
level="INFO", level="INFO",
) )
@@ -154,7 +154,7 @@ class TempCoTest(BaseDVTTest):
context.logger.log_result( context.logger.log_result(
parameter="temp_co", parameter="temp_co",
value=tempco_ppm, value=tempco_ppm,
unit="ppm/°C", unit="ppm/C",
lower_limit=-abs(tempco_limit), lower_limit=-abs(tempco_limit),
upper_limit=abs(tempco_limit), upper_limit=abs(tempco_limit),
) )
@@ -164,13 +164,13 @@ class TempCoTest(BaseDVTTest):
if passed: if passed:
context.logger.log_event( context.logger.log_event(
f"TempCo test PASSED: {tempco_ppm:.2f} ppm/°C within ±{tempco_limit} ppm/°C", f"TempCo test PASSED: {tempco_ppm:.2f} ppm/C within +/-{tempco_limit} ppm/C",
level="INFO", level="INFO",
) )
return TestStatus.PASSED return TestStatus.PASSED
else: else:
context.logger.log_event( context.logger.log_event(
f"TempCo test FAILED: {tempco_ppm:.2f} ppm/°C exceeds ±{tempco_limit} ppm/°C", f"TempCo test FAILED: {tempco_ppm:.2f} ppm/C exceeds +/-{tempco_limit} ppm/C",
level="ERROR", level="ERROR",
) )
return TestStatus.FAILED return TestStatus.FAILED
@@ -198,14 +198,14 @@ class TempCoTest(BaseDVTTest):
"""Calculate temperature coefficient from measurements. """Calculate temperature coefficient from measurements.
Uses linear regression to find the slope (dV/dT), then converts Uses linear regression to find the slope (dV/dT), then converts
to ppm/°C relative to the nominal voltage (voltage at median temperature). to ppm/C relative to the nominal voltage (voltage at median temperature).
Args: Args:
temperatures: Temperature measurements in °C. temperatures: Temperature measurements in C.
voltages: Output voltage measurements in V. voltages: Output voltage measurements in V.
Returns: Returns:
Temperature coefficient in ppm/°C. Temperature coefficient in ppm/C.
Raises: Raises:
ValueError: If insufficient data points. ValueError: If insufficient data points.
@@ -230,14 +230,14 @@ class TempCoTest(BaseDVTTest):
if var_t == 0: if var_t == 0:
raise ValueError("Temperature variance is zero (all temps identical)") raise ValueError("Temperature variance is zero (all temps identical)")
slope = cov / var_t # dV/dT in V/°C slope = cov / var_t # dV/dT in V/C
# Find nominal voltage (voltage at median temperature) # Find nominal voltage (voltage at median temperature)
sorted_pairs = sorted(zip(temperatures, voltages, strict=True)) sorted_pairs = sorted(zip(temperatures, voltages, strict=True))
mid_idx = len(sorted_pairs) // 2 mid_idx = len(sorted_pairs) // 2
v_nominal = sorted_pairs[mid_idx][1] v_nominal = sorted_pairs[mid_idx][1]
# Convert to ppm/°C: (dV/dT) / V_nom * 10^6 # Convert to ppm/C: (dV/dT) / V_nom * 10^6
tempco_ppm = (slope / v_nominal) * 1e6 tempco_ppm = (slope / v_nominal) * 1e6
return tempco_ppm return tempco_ppm

View File

@@ -1 +1,131 @@
"""Configuration for integration tests.""" """Configuration for integration tests."""
import asyncio
import threading
import time
from collections.abc import Generator
import pytest
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
class ServerThread:
"""Helper class to run SimulationServer in a background thread.
This allows synchronous test code to run in the main thread while
the async server runs in its own thread with its own event loop.
"""
def __init__(self, config: ServerConfig):
"""Initialise the server thread.
Args:
config: Server configuration.
"""
self.config = config
self.server: SimulationServer | None = None
self.thread: threading.Thread | None = None
self.loop: asyncio.AbstractEventLoop | None = None
self._started = threading.Event()
self._error: Exception | None = None
def _run_server(self) -> None:
"""Run the server in the background thread."""
try:
# Create new event loop for this thread
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# Create and start server
self.server = SimulationServer(self.config)
self.loop.run_until_complete(self.server.start())
# Signal that server is started
self._started.set()
# Run event loop until stopped
self.loop.run_forever()
except Exception as e:
self._error = e
self._started.set() # Unblock waiting thread even on error
finally:
# Cleanup
if self.server is not None and self.loop is not None:
try:
self.loop.run_until_complete(self.server.stop())
except Exception:
pass
if self.loop is not None:
self.loop.close()
def start(self, timeout: float = 5.0) -> None:
"""Start the server thread.
Args:
timeout: Maximum time to wait for server to start (seconds).
Raises:
RuntimeError: If server fails to start within timeout.
Exception: If server raises an exception during startup.
"""
self.thread = threading.Thread(target=self._run_server, daemon=True)
self.thread.start()
# Wait for server to start
if not self._started.wait(timeout=timeout):
raise RuntimeError("Server failed to start within timeout")
# Check if there was an error during startup
if self._error is not None:
raise self._error
# Give server a bit more time to fully initialize
time.sleep(0.1)
def stop(self, timeout: float = 5.0) -> None:
"""Stop the server thread.
Args:
timeout: Maximum time to wait for server to stop (seconds).
"""
if self.loop is not None and self.loop.is_running():
# Schedule stop in the server's event loop
self.loop.call_soon_threadsafe(self.loop.stop)
if self.thread is not None:
self.thread.join(timeout=timeout)
self.thread = None
@pytest.fixture
def simulation_server() -> Generator[ServerConfig, None, None]:
"""Provide a running simulation server for integration tests.
The server runs in a background thread with its own event loop,
allowing synchronous test code to run in the main thread.
Yields:
ServerConfig with connection details for the running server.
"""
# Use unique ports for each test to avoid conflicts
import random
base_port = random.randint(20000, 30000)
config = ServerConfig(
host="127.0.0.1",
chamber_port=base_port,
psu_port=base_port + 1,
dmm_port=base_port + 2,
physics_rate_hz=100.0,
)
server_thread = ServerThread(config)
server_thread.start()
try:
yield config
finally:
server_thread.stop()

View File

@@ -7,108 +7,85 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
import pytest
from py_dvt_ate.data.models import TestStatus from py_dvt_ate.data.models import TestStatus
from py_dvt_ate.data.repository import SQLiteRepository from py_dvt_ate.data.repository import SQLiteRepository
from py_dvt_ate.framework.context import TestContext from py_dvt_ate.framework.context import TestContext
from py_dvt_ate.framework.logger import TestLogger from py_dvt_ate.framework.logger import TestLogger
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer from py_dvt_ate.simulation.server import ServerConfig
from py_dvt_ate.tests.thermal.tempco import TempCoTest from py_dvt_ate.tests.thermal.tempco import TempCoTest
@pytest.mark.asyncio(loop_scope="function")
class TestTempCoIntegration: class TestTempCoIntegration:
"""Integration tests for TempCo test with simulator.""" """Integration tests for TempCo test with simulator."""
async def test_tempco_runs_successfully(self, tmp_path: Path) -> None: def test_tempco_runs_successfully(
self, tmp_path: Path, simulation_server: ServerConfig
) -> None:
"""Test TempCo test runs end-to-end with simulator.""" """Test TempCo test runs end-to-end with simulator."""
import asyncio # Create instrument set connected to simulator
instrument_config = InstrumentConfig(
# Start simulation server backend="simulator",
server_config = ServerConfig( simulator_host=simulation_server.host,
host="127.0.0.1", chamber_port=simulation_server.chamber_port,
chamber_port=17000, psu_port=simulation_server.psu_port,
psu_port=17001, dmm_port=simulation_server.dmm_port,
dmm_port=17002,
physics_rate_hz=100.0,
) )
server = SimulationServer(server_config) instruments = InstrumentFactory.create(instrument_config)
await server.start()
# Give server time to fully initialize # Create test repository
await asyncio.sleep(0.1) db_path = tmp_path / "test.db"
repository = SQLiteRepository(db_path)
# Create test run
run_id = repository.create_run(
test_name="tempco",
config={
"temperatures": [23.0, 25.0, 27.0], # Close to start temp for fast settling
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.2, # Short since temps close to start
"num_samples": 3, # Reduced for faster test
"tempco_limit": 100.0, # Relaxed for testing
},
description="Integration test of TempCo",
)
# Create test logger
logger = TestLogger(run_id, repository)
# Create test context
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
"temperatures": [23.0, 25.0, 27.0],
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.2,
"num_samples": 3,
"tempco_limit": 100.0,
},
)
# Create test
test = TempCoTest()
assert test.name == "tempco"
assert test.description == "Output voltage temperature coefficient"
# Connect to instruments
instruments.chamber.connect() # type: ignore[attr-defined]
instruments.psu.connect() # type: ignore[attr-defined]
instruments.dmm.connect() # type: ignore[attr-defined]
try: try:
# Create instrument set connected to simulator # Configure instruments
instrument_config = InstrumentConfig( instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing
backend="simulator", instruments.psu.enable_output(1, False) # Ensure off initially
simulator_host="127.0.0.1",
chamber_port=17000,
psu_port=17001,
dmm_port=17002,
)
instruments = InstrumentFactory.create(instrument_config)
# Create test repository # Run test
db_path = tmp_path / "test.db" status = test.execute(context)
repository = SQLiteRepository(db_path)
# Create test run
run_id = repository.create_run(
test_name="tempco",
config={
"temperatures": [0.0, 25.0, 50.0], # Reduced for faster test
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5, # Reduced for faster test
"num_samples": 3, # Reduced for faster test
"tempco_limit": 100.0, # Relaxed for testing
},
description="Integration test of TempCo",
)
# Create test logger
logger = TestLogger(run_id, repository)
# Create test context
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
"temperatures": [0.0, 25.0, 50.0],
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5,
"num_samples": 3,
"tempco_limit": 100.0,
},
)
# Create test
test = TempCoTest()
assert test.name == "tempco"
assert test.description == "Output voltage temperature coefficient"
# Run synchronous test code in thread pool to avoid blocking event loop
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Configure instruments
instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing
instruments.psu.enable_output(1, False) # Ensure off initially
# Run test
return test.execute(context)
status = await loop.run_in_executor(None, run_test)
# Verify test completed # Verify test completed
assert status in (TestStatus.PASSED, TestStatus.FAILED) assert status in (TestStatus.PASSED, TestStatus.FAILED)
@@ -126,7 +103,7 @@ class TestTempCoIntegration:
# Find TempCo result # Find TempCo result
tempco_result = next(r for r in results if r.parameter == "temp_co") tempco_result = next(r for r in results if r.parameter == "temp_co")
assert tempco_result is not None assert tempco_result is not None
assert tempco_result.unit == "ppm/°C" assert tempco_result.unit == "ppm/C"
assert tempco_result.lower_limit == -100.0 assert tempco_result.lower_limit == -100.0
assert tempco_result.upper_limit == 100.0 assert tempco_result.upper_limit == 100.0
@@ -145,71 +122,58 @@ class TestTempCoIntegration:
assert len(temps_recorded) >= 3 assert len(temps_recorded) >= 3
finally: finally:
await server.stop() # Disconnect from instruments
instruments.chamber.disconnect() # type: ignore[attr-defined]
instruments.psu.disconnect() # type: ignore[attr-defined]
instruments.dmm.disconnect() # type: ignore[attr-defined]
async def test_tempco_with_minimal_config(self, tmp_path: Path) -> None: def test_tempco_with_minimal_config(
self, tmp_path: Path, simulation_server: ServerConfig
) -> None:
"""Test TempCo uses default configuration when not specified.""" """Test TempCo uses default configuration when not specified."""
import asyncio # Create instrument set
instrument_config = InstrumentConfig(
# Start simulation server backend="simulator",
server_config = ServerConfig( simulator_host=simulation_server.host,
host="127.0.0.1", chamber_port=simulation_server.chamber_port,
chamber_port=17100, psu_port=simulation_server.psu_port,
psu_port=17101, dmm_port=simulation_server.dmm_port,
dmm_port=17102,
) )
server = SimulationServer(server_config) instruments = InstrumentFactory.create(instrument_config)
await server.start()
# Give server time to fully initialize # Create repository
await asyncio.sleep(0.1) db_path = tmp_path / "test_minimal.db"
repository = SQLiteRepository(db_path)
run_id = repository.create_run(
test_name="tempco",
config={}, # Empty config - should use defaults
)
# Create logger and context with minimal config
logger = TestLogger(run_id, repository)
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
# Override temperatures for faster test
"temperatures": [24.0, 26.0],
"settle_time": 0.2,
"num_samples": 2,
},
)
# Execute test
test = TempCoTest()
# Connect to instruments
instruments.chamber.connect() # type: ignore[attr-defined]
instruments.psu.connect() # type: ignore[attr-defined]
instruments.dmm.connect() # type: ignore[attr-defined]
try: try:
# Create instrument set # Run test
instrument_config = InstrumentConfig( status = test.execute(context)
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17100,
psu_port=17101,
dmm_port=17102,
)
instruments = InstrumentFactory.create(instrument_config)
# Create repository
db_path = tmp_path / "test_minimal.db"
repository = SQLiteRepository(db_path)
run_id = repository.create_run(
test_name="tempco",
config={}, # Empty config - should use defaults
)
# Create logger and context with minimal config
logger = TestLogger(run_id, repository)
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
# Override temperatures for faster test
"temperatures": [25.0, 50.0],
"settle_time": 0.2,
"num_samples": 2,
},
)
# Execute test in thread pool
test = TempCoTest()
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Run test
return test.execute(context)
status = await loop.run_in_executor(None, run_test)
# Should complete without error # Should complete without error
assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR) assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR)
@@ -222,69 +186,55 @@ class TestTempCoIntegration:
assert len(results) >= 1 assert len(results) >= 1
finally: finally:
await server.stop() # Disconnect from instruments
instruments.chamber.disconnect() # type: ignore[attr-defined]
instruments.psu.disconnect() # type: ignore[attr-defined]
instruments.dmm.disconnect() # type: ignore[attr-defined]
async def test_tempco_handles_errors_gracefully(self, tmp_path: Path) -> None: def test_tempco_handles_errors_gracefully(
self, tmp_path: Path, simulation_server: ServerConfig
) -> None:
"""Test TempCo returns ERROR status when instruments fail.""" """Test TempCo returns ERROR status when instruments fail."""
import asyncio # Create instrument set
instrument_config = InstrumentConfig(
# Start simulation server backend="simulator",
server_config = ServerConfig( simulator_host=simulation_server.host,
host="127.0.0.1", chamber_port=simulation_server.chamber_port,
chamber_port=17200, psu_port=simulation_server.psu_port,
psu_port=17201, dmm_port=simulation_server.dmm_port,
dmm_port=17202,
) )
server = SimulationServer(server_config) instruments = InstrumentFactory.create(instrument_config)
await server.start()
# Give server time to fully initialize # Create repository
await asyncio.sleep(0.1) db_path = tmp_path / "test_error.db"
repository = SQLiteRepository(db_path)
run_id = repository.create_run(test_name="tempco", config={})
# Create logger and context
logger = TestLogger(run_id, repository)
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
"temperatures": [], # Invalid: empty temperature list
"settle_time": 0.1,
},
)
# Execute test
test = TempCoTest()
# Connect to instruments
instruments.chamber.connect() # type: ignore[attr-defined]
instruments.psu.connect() # type: ignore[attr-defined]
instruments.dmm.connect() # type: ignore[attr-defined]
try: try:
# Create instrument set
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17200,
psu_port=17201,
dmm_port=17202,
)
instruments = InstrumentFactory.create(instrument_config)
# Create repository
db_path = tmp_path / "test_error.db"
repository = SQLiteRepository(db_path)
run_id = repository.create_run(test_name="tempco", config={})
# Create logger and context
logger = TestLogger(run_id, repository)
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
"temperatures": [], # Invalid: empty temperature list
"settle_time": 0.1,
},
)
# Execute test in thread pool
test = TempCoTest()
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Run test
return test.execute(context)
# Should handle gracefully (may return FAILED or ERROR) # Should handle gracefully (may return FAILED or ERROR)
# The test should not raise an unhandled exception # The test should not raise an unhandled exception
try: try:
status = await loop.run_in_executor(None, run_test) status = test.execute(context)
# If it completes, it should indicate an error or failure # If it completes, it should indicate an error or failure
assert status in (TestStatus.ERROR, TestStatus.FAILED) assert status in (TestStatus.ERROR, TestStatus.FAILED)
except Exception: except Exception:
@@ -294,4 +244,7 @@ class TestTempCoIntegration:
logger.flush() logger.flush()
finally: finally:
await server.stop() # Disconnect from instruments
instruments.chamber.disconnect() # type: ignore[attr-defined]
instruments.psu.disconnect() # type: ignore[attr-defined]
instruments.dmm.disconnect() # type: ignore[attr-defined]