Add TempCo integration tests

This commit is contained in:
2025-09-16 11:20:03 +00:00
parent 96eb83cec4
commit d07e6e3f1a

View File

@@ -0,0 +1,267 @@
"""Integration tests for TempCo characterisation test.
Full end-to-end test of the TempCo test with simulated instruments.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from py_dvt_ate.data.models import TestStatus
from py_dvt_ate.data.repository import SQLiteRepository
from py_dvt_ate.framework.context import TestContext
from py_dvt_ate.framework.logger import TestLogger
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
from py_dvt_ate.tests.thermal.tempco import TempCoTest
@pytest.mark.asyncio(loop_scope="function")
class TestTempCoIntegration:
"""Integration tests for TempCo test with simulator."""
async def test_tempco_runs_successfully(self, tmp_path: Path) -> None:
"""Test TempCo test runs end-to-end with simulator."""
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17000,
psu_port=17001,
dmm_port=17002,
physics_rate_hz=100.0,
)
server = SimulationServer(server_config)
await server.start()
try:
# Create instrument set connected to simulator
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17000,
psu_port=17001,
dmm_port=17002,
)
instruments = InstrumentFactory.create(instrument_config)
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Configure instruments
instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing
instruments.psu.enable_output(1, False) # Ensure off initially
# Create test repository
db_path = tmp_path / "test.db"
repository = SQLiteRepository(db_path)
# Create test run
run_id = repository.create_run(
test_name="tempco",
config={
"temperatures": [0.0, 25.0, 50.0], # Reduced for faster test
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5, # Reduced for faster test
"num_samples": 3, # Reduced for faster test
"tempco_limit": 100.0, # Relaxed for testing
},
description="Integration test of TempCo",
)
# Create test logger
logger = TestLogger(run_id, repository)
# Create test context
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
"temperatures": [0.0, 25.0, 50.0],
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5,
"num_samples": 3,
"tempco_limit": 100.0,
},
)
# Create and execute test
test = TempCoTest()
assert test.name == "tempco"
assert test.description == "Output voltage temperature coefficient"
# Run test (this is synchronous, but simulation runs async in background)
status = test.execute(context)
# Verify test completed
assert status in (TestStatus.PASSED, TestStatus.FAILED)
# Flush logger to ensure all data is written
logger.flush()
# Update run status
repository.complete_run(run_id, status)
# Verify results were logged
results = repository.get_results(run_id)
assert len(results) > 0
# Find TempCo result
tempco_result = next(r for r in results if r.parameter == "temp_co")
assert tempco_result is not None
assert tempco_result.unit == "ppm/°C"
assert tempco_result.lower_limit == -100.0
assert tempco_result.upper_limit == 100.0
# Verify measurements were logged
df = repository.get_measurements_dataframe(run_id)
assert df is not None
assert len(df) >= 3 # At least 3 temperature points
# Verify v_out measurements exist
vout_measurements = df[df["parameter"] == "v_out"]
assert len(vout_measurements) >= 3
# Verify temperature conditions were logged
assert "temperature" in df.columns
temps_recorded = vout_measurements["temperature"].unique()
assert len(temps_recorded) >= 3
finally:
await server.stop()
async def test_tempco_with_minimal_config(self, tmp_path: Path) -> None:
"""Test TempCo uses default configuration when not specified."""
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17100,
psu_port=17101,
dmm_port=17102,
)
server = SimulationServer(server_config)
await server.start()
try:
# Create instrument set
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17100,
psu_port=17101,
dmm_port=17102,
)
instruments = InstrumentFactory.create(instrument_config)
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Create repository
db_path = tmp_path / "test_minimal.db"
repository = SQLiteRepository(db_path)
run_id = repository.create_run(
test_name="tempco",
config={}, # Empty config - should use defaults
)
# Create logger and context with minimal config
logger = TestLogger(run_id, repository)
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
# Override temperatures for faster test
"temperatures": [25.0, 50.0],
"settle_time": 0.2,
"num_samples": 2,
},
)
# Execute test
test = TempCoTest()
status = test.execute(context)
# Should complete without error
assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR)
logger.flush()
repository.complete_run(run_id, status)
# Verify some data was logged
results = repository.get_results(run_id)
assert len(results) >= 1
finally:
await server.stop()
async def test_tempco_handles_errors_gracefully(self, tmp_path: Path) -> None:
"""Test TempCo returns ERROR status when instruments fail."""
# Start simulation server
server_config = ServerConfig(
host="127.0.0.1",
chamber_port=17200,
psu_port=17201,
dmm_port=17202,
)
server = SimulationServer(server_config)
await server.start()
try:
# Create instrument set
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host="127.0.0.1",
chamber_port=17200,
psu_port=17201,
dmm_port=17202,
)
instruments = InstrumentFactory.create(instrument_config)
# Connect to instruments
instruments.chamber.connect()
instruments.psu.connect()
instruments.dmm.connect()
# Create repository
db_path = tmp_path / "test_error.db"
repository = SQLiteRepository(db_path)
run_id = repository.create_run(test_name="tempco", config={})
# Create logger and context
logger = TestLogger(run_id, repository)
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config={
"temperatures": [], # Invalid: empty temperature list
"settle_time": 0.1,
},
)
# Execute test
test = TempCoTest()
# Should handle gracefully (may return FAILED or ERROR)
# The test should not raise an unhandled exception
try:
status = test.execute(context)
# If it completes, it should indicate an error or failure
assert status in (TestStatus.ERROR, TestStatus.FAILED)
except Exception:
# Or it might raise, which we also consider handled
pass
logger.flush()
finally:
await server.stop()