Add TempCo integration tests
This commit is contained in:
267
tests/integration/test_tempco.py
Normal file
267
tests/integration/test_tempco.py
Normal file
@@ -0,0 +1,267 @@
|
||||
"""Integration tests for TempCo characterisation test.
|
||||
|
||||
Full end-to-end test of the TempCo test with simulated instruments.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from py_dvt_ate.data.models import TestStatus
|
||||
from py_dvt_ate.data.repository import SQLiteRepository
|
||||
from py_dvt_ate.framework.context import TestContext
|
||||
from py_dvt_ate.framework.logger import TestLogger
|
||||
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
|
||||
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
||||
from py_dvt_ate.tests.thermal.tempco import TempCoTest
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="function")
|
||||
class TestTempCoIntegration:
|
||||
"""Integration tests for TempCo test with simulator."""
|
||||
|
||||
async def test_tempco_runs_successfully(self, tmp_path: Path) -> None:
|
||||
"""Test TempCo test runs end-to-end with simulator."""
|
||||
# Start simulation server
|
||||
server_config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=17000,
|
||||
psu_port=17001,
|
||||
dmm_port=17002,
|
||||
physics_rate_hz=100.0,
|
||||
)
|
||||
server = SimulationServer(server_config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Create instrument set connected to simulator
|
||||
instrument_config = InstrumentConfig(
|
||||
backend="simulator",
|
||||
simulator_host="127.0.0.1",
|
||||
chamber_port=17000,
|
||||
psu_port=17001,
|
||||
dmm_port=17002,
|
||||
)
|
||||
instruments = InstrumentFactory.create(instrument_config)
|
||||
|
||||
# Connect to instruments
|
||||
instruments.chamber.connect()
|
||||
instruments.psu.connect()
|
||||
instruments.dmm.connect()
|
||||
|
||||
# Configure instruments
|
||||
instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing
|
||||
instruments.psu.enable_output(1, False) # Ensure off initially
|
||||
|
||||
# Create test repository
|
||||
db_path = tmp_path / "test.db"
|
||||
repository = SQLiteRepository(db_path)
|
||||
|
||||
# Create test run
|
||||
run_id = repository.create_run(
|
||||
test_name="tempco",
|
||||
config={
|
||||
"temperatures": [0.0, 25.0, 50.0], # Reduced for faster test
|
||||
"input_voltage": 5.0,
|
||||
"load_current": 0.1,
|
||||
"settle_time": 0.5, # Reduced for faster test
|
||||
"num_samples": 3, # Reduced for faster test
|
||||
"tempco_limit": 100.0, # Relaxed for testing
|
||||
},
|
||||
description="Integration test of TempCo",
|
||||
)
|
||||
|
||||
# Create test logger
|
||||
logger = TestLogger(run_id, repository)
|
||||
|
||||
# Create test context
|
||||
context = TestContext(
|
||||
run_id=run_id,
|
||||
instruments=instruments,
|
||||
logger=logger,
|
||||
config={
|
||||
"temperatures": [0.0, 25.0, 50.0],
|
||||
"input_voltage": 5.0,
|
||||
"load_current": 0.1,
|
||||
"settle_time": 0.5,
|
||||
"num_samples": 3,
|
||||
"tempco_limit": 100.0,
|
||||
},
|
||||
)
|
||||
|
||||
# Create and execute test
|
||||
test = TempCoTest()
|
||||
assert test.name == "tempco"
|
||||
assert test.description == "Output voltage temperature coefficient"
|
||||
|
||||
# Run test (this is synchronous, but simulation runs async in background)
|
||||
status = test.execute(context)
|
||||
|
||||
# Verify test completed
|
||||
assert status in (TestStatus.PASSED, TestStatus.FAILED)
|
||||
|
||||
# Flush logger to ensure all data is written
|
||||
logger.flush()
|
||||
|
||||
# Update run status
|
||||
repository.complete_run(run_id, status)
|
||||
|
||||
# Verify results were logged
|
||||
results = repository.get_results(run_id)
|
||||
assert len(results) > 0
|
||||
|
||||
# Find TempCo result
|
||||
tempco_result = next(r for r in results if r.parameter == "temp_co")
|
||||
assert tempco_result is not None
|
||||
assert tempco_result.unit == "ppm/°C"
|
||||
assert tempco_result.lower_limit == -100.0
|
||||
assert tempco_result.upper_limit == 100.0
|
||||
|
||||
# Verify measurements were logged
|
||||
df = repository.get_measurements_dataframe(run_id)
|
||||
assert df is not None
|
||||
assert len(df) >= 3 # At least 3 temperature points
|
||||
|
||||
# Verify v_out measurements exist
|
||||
vout_measurements = df[df["parameter"] == "v_out"]
|
||||
assert len(vout_measurements) >= 3
|
||||
|
||||
# Verify temperature conditions were logged
|
||||
assert "temperature" in df.columns
|
||||
temps_recorded = vout_measurements["temperature"].unique()
|
||||
assert len(temps_recorded) >= 3
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
async def test_tempco_with_minimal_config(self, tmp_path: Path) -> None:
|
||||
"""Test TempCo uses default configuration when not specified."""
|
||||
# Start simulation server
|
||||
server_config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=17100,
|
||||
psu_port=17101,
|
||||
dmm_port=17102,
|
||||
)
|
||||
server = SimulationServer(server_config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Create instrument set
|
||||
instrument_config = InstrumentConfig(
|
||||
backend="simulator",
|
||||
simulator_host="127.0.0.1",
|
||||
chamber_port=17100,
|
||||
psu_port=17101,
|
||||
dmm_port=17102,
|
||||
)
|
||||
instruments = InstrumentFactory.create(instrument_config)
|
||||
|
||||
# Connect to instruments
|
||||
instruments.chamber.connect()
|
||||
instruments.psu.connect()
|
||||
instruments.dmm.connect()
|
||||
|
||||
# Create repository
|
||||
db_path = tmp_path / "test_minimal.db"
|
||||
repository = SQLiteRepository(db_path)
|
||||
run_id = repository.create_run(
|
||||
test_name="tempco",
|
||||
config={}, # Empty config - should use defaults
|
||||
)
|
||||
|
||||
# Create logger and context with minimal config
|
||||
logger = TestLogger(run_id, repository)
|
||||
context = TestContext(
|
||||
run_id=run_id,
|
||||
instruments=instruments,
|
||||
logger=logger,
|
||||
config={
|
||||
# Override temperatures for faster test
|
||||
"temperatures": [25.0, 50.0],
|
||||
"settle_time": 0.2,
|
||||
"num_samples": 2,
|
||||
},
|
||||
)
|
||||
|
||||
# Execute test
|
||||
test = TempCoTest()
|
||||
status = test.execute(context)
|
||||
|
||||
# Should complete without error
|
||||
assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR)
|
||||
|
||||
logger.flush()
|
||||
repository.complete_run(run_id, status)
|
||||
|
||||
# Verify some data was logged
|
||||
results = repository.get_results(run_id)
|
||||
assert len(results) >= 1
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
async def test_tempco_handles_errors_gracefully(self, tmp_path: Path) -> None:
|
||||
"""Test TempCo returns ERROR status when instruments fail."""
|
||||
# Start simulation server
|
||||
server_config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=17200,
|
||||
psu_port=17201,
|
||||
dmm_port=17202,
|
||||
)
|
||||
server = SimulationServer(server_config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Create instrument set
|
||||
instrument_config = InstrumentConfig(
|
||||
backend="simulator",
|
||||
simulator_host="127.0.0.1",
|
||||
chamber_port=17200,
|
||||
psu_port=17201,
|
||||
dmm_port=17202,
|
||||
)
|
||||
instruments = InstrumentFactory.create(instrument_config)
|
||||
|
||||
# Connect to instruments
|
||||
instruments.chamber.connect()
|
||||
instruments.psu.connect()
|
||||
instruments.dmm.connect()
|
||||
|
||||
# Create repository
|
||||
db_path = tmp_path / "test_error.db"
|
||||
repository = SQLiteRepository(db_path)
|
||||
run_id = repository.create_run(test_name="tempco", config={})
|
||||
|
||||
# Create logger and context
|
||||
logger = TestLogger(run_id, repository)
|
||||
context = TestContext(
|
||||
run_id=run_id,
|
||||
instruments=instruments,
|
||||
logger=logger,
|
||||
config={
|
||||
"temperatures": [], # Invalid: empty temperature list
|
||||
"settle_time": 0.1,
|
||||
},
|
||||
)
|
||||
|
||||
# Execute test
|
||||
test = TempCoTest()
|
||||
|
||||
# Should handle gracefully (may return FAILED or ERROR)
|
||||
# The test should not raise an unhandled exception
|
||||
try:
|
||||
status = test.execute(context)
|
||||
# If it completes, it should indicate an error or failure
|
||||
assert status in (TestStatus.ERROR, TestStatus.FAILED)
|
||||
except Exception:
|
||||
# Or it might raise, which we also consider handled
|
||||
pass
|
||||
|
||||
logger.flush()
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
Reference in New Issue
Block a user