diff --git a/tests/integration/test_tempco.py b/tests/integration/test_tempco.py new file mode 100644 index 0000000..7715a7d --- /dev/null +++ b/tests/integration/test_tempco.py @@ -0,0 +1,267 @@ +"""Integration tests for TempCo characterisation test. + +Full end-to-end test of the TempCo test with simulated instruments. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from py_dvt_ate.data.models import TestStatus +from py_dvt_ate.data.repository import SQLiteRepository +from py_dvt_ate.framework.context import TestContext +from py_dvt_ate.framework.logger import TestLogger +from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory +from py_dvt_ate.simulation.server import ServerConfig, SimulationServer +from py_dvt_ate.tests.thermal.tempco import TempCoTest + + +@pytest.mark.asyncio(loop_scope="function") +class TestTempCoIntegration: + """Integration tests for TempCo test with simulator.""" + + async def test_tempco_runs_successfully(self, tmp_path: Path) -> None: + """Test TempCo test runs end-to-end with simulator.""" + # Start simulation server + server_config = ServerConfig( + host="127.0.0.1", + chamber_port=17000, + psu_port=17001, + dmm_port=17002, + physics_rate_hz=100.0, + ) + server = SimulationServer(server_config) + await server.start() + + try: + # Create instrument set connected to simulator + instrument_config = InstrumentConfig( + backend="simulator", + simulator_host="127.0.0.1", + chamber_port=17000, + psu_port=17001, + dmm_port=17002, + ) + instruments = InstrumentFactory.create(instrument_config) + + # Connect to instruments + instruments.chamber.connect() + instruments.psu.connect() + instruments.dmm.connect() + + # Configure instruments + instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing + instruments.psu.enable_output(1, False) # Ensure off initially + + # Create test repository + db_path = tmp_path / "test.db" + repository = SQLiteRepository(db_path) + + # Create test run + run_id = repository.create_run( + test_name="tempco", + config={ + "temperatures": [0.0, 25.0, 50.0], # Reduced for faster test + "input_voltage": 5.0, + "load_current": 0.1, + "settle_time": 0.5, # Reduced for faster test + "num_samples": 3, # Reduced for faster test + "tempco_limit": 100.0, # Relaxed for testing + }, + description="Integration test of TempCo", + ) + + # Create test logger + logger = TestLogger(run_id, repository) + + # Create test context + context = TestContext( + run_id=run_id, + instruments=instruments, + logger=logger, + config={ + "temperatures": [0.0, 25.0, 50.0], + "input_voltage": 5.0, + "load_current": 0.1, + "settle_time": 0.5, + "num_samples": 3, + "tempco_limit": 100.0, + }, + ) + + # Create and execute test + test = TempCoTest() + assert test.name == "tempco" + assert test.description == "Output voltage temperature coefficient" + + # Run test (this is synchronous, but simulation runs async in background) + status = test.execute(context) + + # Verify test completed + assert status in (TestStatus.PASSED, TestStatus.FAILED) + + # Flush logger to ensure all data is written + logger.flush() + + # Update run status + repository.complete_run(run_id, status) + + # Verify results were logged + results = repository.get_results(run_id) + assert len(results) > 0 + + # Find TempCo result + tempco_result = next(r for r in results if r.parameter == "temp_co") + assert tempco_result is not None + assert tempco_result.unit == "ppm/°C" + assert tempco_result.lower_limit == -100.0 + assert tempco_result.upper_limit == 100.0 + + # Verify measurements were logged + df = repository.get_measurements_dataframe(run_id) + assert df is not None + assert len(df) >= 3 # At least 3 temperature points + + # Verify v_out measurements exist + vout_measurements = df[df["parameter"] == "v_out"] + assert len(vout_measurements) >= 3 + + # Verify temperature conditions were logged + assert "temperature" in df.columns + temps_recorded = vout_measurements["temperature"].unique() + assert len(temps_recorded) >= 3 + + finally: + await server.stop() + + async def test_tempco_with_minimal_config(self, tmp_path: Path) -> None: + """Test TempCo uses default configuration when not specified.""" + # Start simulation server + server_config = ServerConfig( + host="127.0.0.1", + chamber_port=17100, + psu_port=17101, + dmm_port=17102, + ) + server = SimulationServer(server_config) + await server.start() + + try: + # Create instrument set + instrument_config = InstrumentConfig( + backend="simulator", + simulator_host="127.0.0.1", + chamber_port=17100, + psu_port=17101, + dmm_port=17102, + ) + instruments = InstrumentFactory.create(instrument_config) + + # Connect to instruments + instruments.chamber.connect() + instruments.psu.connect() + instruments.dmm.connect() + + # Create repository + db_path = tmp_path / "test_minimal.db" + repository = SQLiteRepository(db_path) + run_id = repository.create_run( + test_name="tempco", + config={}, # Empty config - should use defaults + ) + + # Create logger and context with minimal config + logger = TestLogger(run_id, repository) + context = TestContext( + run_id=run_id, + instruments=instruments, + logger=logger, + config={ + # Override temperatures for faster test + "temperatures": [25.0, 50.0], + "settle_time": 0.2, + "num_samples": 2, + }, + ) + + # Execute test + test = TempCoTest() + status = test.execute(context) + + # Should complete without error + assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR) + + logger.flush() + repository.complete_run(run_id, status) + + # Verify some data was logged + results = repository.get_results(run_id) + assert len(results) >= 1 + + finally: + await server.stop() + + async def test_tempco_handles_errors_gracefully(self, tmp_path: Path) -> None: + """Test TempCo returns ERROR status when instruments fail.""" + # Start simulation server + server_config = ServerConfig( + host="127.0.0.1", + chamber_port=17200, + psu_port=17201, + dmm_port=17202, + ) + server = SimulationServer(server_config) + await server.start() + + try: + # Create instrument set + instrument_config = InstrumentConfig( + backend="simulator", + simulator_host="127.0.0.1", + chamber_port=17200, + psu_port=17201, + dmm_port=17202, + ) + instruments = InstrumentFactory.create(instrument_config) + + # Connect to instruments + instruments.chamber.connect() + instruments.psu.connect() + instruments.dmm.connect() + + # Create repository + db_path = tmp_path / "test_error.db" + repository = SQLiteRepository(db_path) + run_id = repository.create_run(test_name="tempco", config={}) + + # Create logger and context + logger = TestLogger(run_id, repository) + context = TestContext( + run_id=run_id, + instruments=instruments, + logger=logger, + config={ + "temperatures": [], # Invalid: empty temperature list + "settle_time": 0.1, + }, + ) + + # Execute test + test = TempCoTest() + + # Should handle gracefully (may return FAILED or ERROR) + # The test should not raise an unhandled exception + try: + status = test.execute(context) + # If it completes, it should indicate an error or failure + assert status in (TestStatus.ERROR, TestStatus.FAILED) + except Exception: + # Or it might raise, which we also consider handled + pass + + logger.flush() + + finally: + await server.stop()