Move synchronous test execution to thread pool executor to avoid blocking the async event loop. This prevents deadlocks when sync client code tries to communicate with async server in same loop. Note: Integration tests still experiencing timeouts - needs further investigation. Unit tests and TCP server communication are working.
298 lines
10 KiB
Python
298 lines
10 KiB
Python
"""Integration tests for TempCo characterisation test.
|
|
|
|
Full end-to-end test of the TempCo test with simulated instruments.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from py_dvt_ate.data.models import TestStatus
|
|
from py_dvt_ate.data.repository import SQLiteRepository
|
|
from py_dvt_ate.framework.context import TestContext
|
|
from py_dvt_ate.framework.logger import TestLogger
|
|
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
|
|
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
|
from py_dvt_ate.tests.thermal.tempco import TempCoTest
|
|
|
|
|
|
@pytest.mark.asyncio(loop_scope="function")
|
|
class TestTempCoIntegration:
|
|
"""Integration tests for TempCo test with simulator."""
|
|
|
|
async def test_tempco_runs_successfully(self, tmp_path: Path) -> None:
|
|
"""Test TempCo test runs end-to-end with simulator."""
|
|
import asyncio
|
|
|
|
# Start simulation server
|
|
server_config = ServerConfig(
|
|
host="127.0.0.1",
|
|
chamber_port=17000,
|
|
psu_port=17001,
|
|
dmm_port=17002,
|
|
physics_rate_hz=100.0,
|
|
)
|
|
server = SimulationServer(server_config)
|
|
await server.start()
|
|
|
|
# Give server time to fully initialize
|
|
await asyncio.sleep(0.1)
|
|
|
|
try:
|
|
# Create instrument set connected to simulator
|
|
instrument_config = InstrumentConfig(
|
|
backend="simulator",
|
|
simulator_host="127.0.0.1",
|
|
chamber_port=17000,
|
|
psu_port=17001,
|
|
dmm_port=17002,
|
|
)
|
|
instruments = InstrumentFactory.create(instrument_config)
|
|
|
|
# Create test repository
|
|
db_path = tmp_path / "test.db"
|
|
repository = SQLiteRepository(db_path)
|
|
|
|
# Create test run
|
|
run_id = repository.create_run(
|
|
test_name="tempco",
|
|
config={
|
|
"temperatures": [0.0, 25.0, 50.0], # Reduced for faster test
|
|
"input_voltage": 5.0,
|
|
"load_current": 0.1,
|
|
"settle_time": 0.5, # Reduced for faster test
|
|
"num_samples": 3, # Reduced for faster test
|
|
"tempco_limit": 100.0, # Relaxed for testing
|
|
},
|
|
description="Integration test of TempCo",
|
|
)
|
|
|
|
# Create test logger
|
|
logger = TestLogger(run_id, repository)
|
|
|
|
# Create test context
|
|
context = TestContext(
|
|
run_id=run_id,
|
|
instruments=instruments,
|
|
logger=logger,
|
|
config={
|
|
"temperatures": [0.0, 25.0, 50.0],
|
|
"input_voltage": 5.0,
|
|
"load_current": 0.1,
|
|
"settle_time": 0.5,
|
|
"num_samples": 3,
|
|
"tempco_limit": 100.0,
|
|
},
|
|
)
|
|
|
|
# Create test
|
|
test = TempCoTest()
|
|
assert test.name == "tempco"
|
|
assert test.description == "Output voltage temperature coefficient"
|
|
|
|
# Run synchronous test code in thread pool to avoid blocking event loop
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def run_test():
|
|
# Connect to instruments
|
|
instruments.chamber.connect()
|
|
instruments.psu.connect()
|
|
instruments.dmm.connect()
|
|
|
|
# Configure instruments
|
|
instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing
|
|
instruments.psu.enable_output(1, False) # Ensure off initially
|
|
|
|
# Run test
|
|
return test.execute(context)
|
|
|
|
status = await loop.run_in_executor(None, run_test)
|
|
|
|
# Verify test completed
|
|
assert status in (TestStatus.PASSED, TestStatus.FAILED)
|
|
|
|
# Flush logger to ensure all data is written
|
|
logger.flush()
|
|
|
|
# Update run status
|
|
repository.complete_run(run_id, status)
|
|
|
|
# Verify results were logged
|
|
results = repository.get_results(run_id)
|
|
assert len(results) > 0
|
|
|
|
# Find TempCo result
|
|
tempco_result = next(r for r in results if r.parameter == "temp_co")
|
|
assert tempco_result is not None
|
|
assert tempco_result.unit == "ppm/°C"
|
|
assert tempco_result.lower_limit == -100.0
|
|
assert tempco_result.upper_limit == 100.0
|
|
|
|
# Verify measurements were logged
|
|
df = repository.get_measurements_dataframe(run_id)
|
|
assert df is not None
|
|
assert len(df) >= 3 # At least 3 temperature points
|
|
|
|
# Verify v_out measurements exist
|
|
vout_measurements = df[df["parameter"] == "v_out"]
|
|
assert len(vout_measurements) >= 3
|
|
|
|
# Verify temperature conditions were logged
|
|
assert "temperature" in df.columns
|
|
temps_recorded = vout_measurements["temperature"].unique()
|
|
assert len(temps_recorded) >= 3
|
|
|
|
finally:
|
|
await server.stop()
|
|
|
|
async def test_tempco_with_minimal_config(self, tmp_path: Path) -> None:
|
|
"""Test TempCo uses default configuration when not specified."""
|
|
import asyncio
|
|
|
|
# Start simulation server
|
|
server_config = ServerConfig(
|
|
host="127.0.0.1",
|
|
chamber_port=17100,
|
|
psu_port=17101,
|
|
dmm_port=17102,
|
|
)
|
|
server = SimulationServer(server_config)
|
|
await server.start()
|
|
|
|
# Give server time to fully initialize
|
|
await asyncio.sleep(0.1)
|
|
|
|
try:
|
|
# Create instrument set
|
|
instrument_config = InstrumentConfig(
|
|
backend="simulator",
|
|
simulator_host="127.0.0.1",
|
|
chamber_port=17100,
|
|
psu_port=17101,
|
|
dmm_port=17102,
|
|
)
|
|
instruments = InstrumentFactory.create(instrument_config)
|
|
|
|
# Create repository
|
|
db_path = tmp_path / "test_minimal.db"
|
|
repository = SQLiteRepository(db_path)
|
|
run_id = repository.create_run(
|
|
test_name="tempco",
|
|
config={}, # Empty config - should use defaults
|
|
)
|
|
|
|
# Create logger and context with minimal config
|
|
logger = TestLogger(run_id, repository)
|
|
context = TestContext(
|
|
run_id=run_id,
|
|
instruments=instruments,
|
|
logger=logger,
|
|
config={
|
|
# Override temperatures for faster test
|
|
"temperatures": [25.0, 50.0],
|
|
"settle_time": 0.2,
|
|
"num_samples": 2,
|
|
},
|
|
)
|
|
|
|
# Execute test in thread pool
|
|
test = TempCoTest()
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def run_test():
|
|
# Connect to instruments
|
|
instruments.chamber.connect()
|
|
instruments.psu.connect()
|
|
instruments.dmm.connect()
|
|
# Run test
|
|
return test.execute(context)
|
|
|
|
status = await loop.run_in_executor(None, run_test)
|
|
|
|
# Should complete without error
|
|
assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR)
|
|
|
|
logger.flush()
|
|
repository.complete_run(run_id, status)
|
|
|
|
# Verify some data was logged
|
|
results = repository.get_results(run_id)
|
|
assert len(results) >= 1
|
|
|
|
finally:
|
|
await server.stop()
|
|
|
|
async def test_tempco_handles_errors_gracefully(self, tmp_path: Path) -> None:
|
|
"""Test TempCo returns ERROR status when instruments fail."""
|
|
import asyncio
|
|
|
|
# Start simulation server
|
|
server_config = ServerConfig(
|
|
host="127.0.0.1",
|
|
chamber_port=17200,
|
|
psu_port=17201,
|
|
dmm_port=17202,
|
|
)
|
|
server = SimulationServer(server_config)
|
|
await server.start()
|
|
|
|
# Give server time to fully initialize
|
|
await asyncio.sleep(0.1)
|
|
|
|
try:
|
|
# Create instrument set
|
|
instrument_config = InstrumentConfig(
|
|
backend="simulator",
|
|
simulator_host="127.0.0.1",
|
|
chamber_port=17200,
|
|
psu_port=17201,
|
|
dmm_port=17202,
|
|
)
|
|
instruments = InstrumentFactory.create(instrument_config)
|
|
|
|
# Create repository
|
|
db_path = tmp_path / "test_error.db"
|
|
repository = SQLiteRepository(db_path)
|
|
run_id = repository.create_run(test_name="tempco", config={})
|
|
|
|
# Create logger and context
|
|
logger = TestLogger(run_id, repository)
|
|
context = TestContext(
|
|
run_id=run_id,
|
|
instruments=instruments,
|
|
logger=logger,
|
|
config={
|
|
"temperatures": [], # Invalid: empty temperature list
|
|
"settle_time": 0.1,
|
|
},
|
|
)
|
|
|
|
# Execute test in thread pool
|
|
test = TempCoTest()
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def run_test():
|
|
# Connect to instruments
|
|
instruments.chamber.connect()
|
|
instruments.psu.connect()
|
|
instruments.dmm.connect()
|
|
# Run test
|
|
return test.execute(context)
|
|
|
|
# Should handle gracefully (may return FAILED or ERROR)
|
|
# The test should not raise an unhandled exception
|
|
try:
|
|
status = await loop.run_in_executor(None, run_test)
|
|
# If it completes, it should indicate an error or failure
|
|
assert status in (TestStatus.ERROR, TestStatus.FAILED)
|
|
except Exception:
|
|
# Or it might raise, which we also consider handled
|
|
pass
|
|
|
|
logger.flush()
|
|
|
|
finally:
|
|
await server.stop()
|