Files
py-dvt-ate/tests/integration/test_tempco.py
Kai Chappell a0d096512f WIP: Use thread pool executor for integration tests
Move synchronous test execution to thread pool executor to avoid
blocking the async event loop. This prevents deadlocks when sync
client code tries to communicate with async server in same loop.

Note: Integration tests still experiencing timeouts - needs further
investigation. Unit tests and TCP server communication are working.
2025-10-08 16:16:13 +00:00

298 lines
10 KiB
Python

"""Integration tests for TempCo characterisation test.
Full end-to-end test of the TempCo test with simulated instruments.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from py_dvt_ate.data.models import TestStatus
from py_dvt_ate.data.repository import SQLiteRepository
from py_dvt_ate.framework.context import TestContext
from py_dvt_ate.framework.logger import TestLogger
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.tests.thermal.tempco import TempCoTest
@pytest.mark.asyncio(loop_scope="function")
class TestTempCoIntegration:
"""Integration tests for TempCo test with simulator."""
async def test_tempco_runs_successfully(self, tmp_path: Path) -> None:
"""Test TempCo test runs end-to-end with simulator."""
import asyncio
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17000,
psu_port=17001,
dmm_port=17002,
physics_rate_hz=100.0,
)
server = SimulationServer(server_config)
await server.start()
# Give server time to fully initialize
await asyncio.sleep(0.1)
try:
# Create instrument set connected to simulator
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17000,
psu_port=17001,
dmm_port=17002,
)
instruments = InstrumentFactory.create(instrument_config)
# Create test repository
db_path = tmp_path / "test.db"
repository = SQLiteRepository(db_path)
# Create test run
run_id = repository.create_run(
test_name="tempco",
config={
"temperatures": [0.0, 25.0, 50.0], # Reduced for faster test
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5, # Reduced for faster test
"num_samples": 3, # Reduced for faster test
"tempco_limit": 100.0, # Relaxed for testing
},
description="Integration test of TempCo",
)
# Create test logger
logger = TestLogger(run_id, repository)
# Create test context
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
"temperatures": [0.0, 25.0, 50.0],
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5,
"num_samples": 3,
"tempco_limit": 100.0,
},
)
# Create test
test = TempCoTest()
assert test.name == "tempco"
assert test.description == "Output voltage temperature coefficient"
# Run synchronous test code in thread pool to avoid blocking event loop
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Configure instruments
instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing
instruments.psu.enable_output(1, False) # Ensure off initially
# Run test
return test.execute(context)
status = await loop.run_in_executor(None, run_test)
# Verify test completed
assert status in (TestStatus.PASSED, TestStatus.FAILED)
# Flush logger to ensure all data is written
logger.flush()
# Update run status
repository.complete_run(run_id, status)
# Verify results were logged
results = repository.get_results(run_id)
assert len(results) > 0
# Find TempCo result
tempco_result = next(r for r in results if r.parameter == "temp_co")
assert tempco_result is not None
assert tempco_result.unit == "ppm/°C"
assert tempco_result.lower_limit == -100.0
assert tempco_result.upper_limit == 100.0
# Verify measurements were logged
df = repository.get_measurements_dataframe(run_id)
assert df is not None
assert len(df) >= 3 # At least 3 temperature points
# Verify v_out measurements exist
vout_measurements = df[df["parameter"] == "v_out"]
assert len(vout_measurements) >= 3
# Verify temperature conditions were logged
assert "temperature" in df.columns
temps_recorded = vout_measurements["temperature"].unique()
assert len(temps_recorded) >= 3
finally:
await server.stop()
async def test_tempco_with_minimal_config(self, tmp_path: Path) -> None:
"""Test TempCo uses default configuration when not specified."""
import asyncio
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17100,
psu_port=17101,
dmm_port=17102,
)
server = SimulationServer(server_config)
await server.start()
# Give server time to fully initialize
await asyncio.sleep(0.1)
try:
# Create instrument set
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17100,
psu_port=17101,
dmm_port=17102,
)
instruments = InstrumentFactory.create(instrument_config)
# Create repository
db_path = tmp_path / "test_minimal.db"
repository = SQLiteRepository(db_path)
run_id = repository.create_run(
test_name="tempco",
config={}, # Empty config - should use defaults
)
# Create logger and context with minimal config
logger = TestLogger(run_id, repository)
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
# Override temperatures for faster test
"temperatures": [25.0, 50.0],
"settle_time": 0.2,
"num_samples": 2,
},
)
# Execute test in thread pool
test = TempCoTest()
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Run test
return test.execute(context)
status = await loop.run_in_executor(None, run_test)
# Should complete without error
assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR)
logger.flush()
repository.complete_run(run_id, status)
# Verify some data was logged
results = repository.get_results(run_id)
assert len(results) >= 1
finally:
await server.stop()
async def test_tempco_handles_errors_gracefully(self, tmp_path: Path) -> None:
"""Test TempCo returns ERROR status when instruments fail."""
import asyncio
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17200,
psu_port=17201,
dmm_port=17202,
)
server = SimulationServer(server_config)
await server.start()
# Give server time to fully initialize
await asyncio.sleep(0.1)
try:
# Create instrument set
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17200,
psu_port=17201,
dmm_port=17202,
)
instruments = InstrumentFactory.create(instrument_config)
# Create repository
db_path = tmp_path / "test_error.db"
repository = SQLiteRepository(db_path)
run_id = repository.create_run(test_name="tempco", config={})
# Create logger and context
logger = TestLogger(run_id, repository)
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
"temperatures": [], # Invalid: empty temperature list
"settle_time": 0.1,
},
)
# Execute test in thread pool
test = TempCoTest()
loop = asyncio.get_running_loop()
def run_test():
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Run test
return test.execute(context)
# Should handle gracefully (may return FAILED or ERROR)
# The test should not raise an unhandled exception
try:
status = await loop.run_in_executor(None, run_test)
# If it completes, it should indicate an error or failure
assert status in (TestStatus.ERROR, TestStatus.FAILED)
except Exception:
# Or it might raise, which we also consider handled
pass
logger.flush()
finally:
await server.stop()