"""End-to-end integration tests for py_dvt_ate. This module contains comprehensive tests that exercise the entire system: - Simulation server startup - Instrument connectivity via HAL - Test execution through the framework - Data persistence - Results retrieval These tests verify that all components work together correctly in a complete workflow from server start to results analysis. """ from pathlib import Path from tempfile import TemporaryDirectory from py_dvt_ate.data.models import TestStatus from py_dvt_ate.data.repository import SQLiteRepository from py_dvt_ate.framework.runner import TestRunner from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory from py_dvt_ate.simulation.server import ServerConfig from py_dvt_ate.tests.thermal.tempco import TempCoTest def test_e2e_tempco_characterization(simulation_server: ServerConfig) -> None: """End-to-end test: Run complete TempCo characterization workflow. This test exercises the entire system: 1. Simulation server is running (from fixture) 2. Create instruments via HAL factory 3. Create test repository and runner 4. Execute TempCo test 5. Verify results are persisted 6. Verify measurements are stored 7. Retrieve and analyze results This is the closest test to real-world usage, verifying that all components integrate correctly. """ with TemporaryDirectory() as tmpdir: # Step 1: Create instruments via HAL instrument_config = InstrumentConfig( backend="simulator", simulator_host=simulation_server.host, chamber_port=simulation_server.chamber_port, psu_port=simulation_server.psu_port, dmm_port=simulation_server.dmm_port, ) instruments = InstrumentFactory.create(instrument_config) # Connect to instruments instruments.chamber.transport.connect() instruments.psu.transport.connect() instruments.dmm.transport.connect() # Verify instrument connectivity idn = instruments.chamber.get_temperature() # Should not raise assert isinstance(idn, float) # Step 2: Create repository and test runner db_path = Path(tmpdir) / "test.db" repository = SQLiteRepository(str(db_path)) runner = TestRunner(repository) # Step 3: Execute TempCo test with minimal config for speed test = TempCoTest() config = { "temperatures": [0.0, 25.0, 50.0], # Reduced for test speed "input_voltage": 5.0, "load_current": 0.1, "settle_time": 0.5, # Reduced for test speed "num_samples": 3, # Reduced for test speed "tempco_limit": 100.0, # Relaxed for test } run_id = runner.run_test( test=test, instruments=instruments, config=config, operator="test_user", description="E2E integration test", ) # Step 4: Verify run was created assert run_id is not None run = repository.get_run(run_id) assert run.test_name == "tempco" assert run.status in [TestStatus.PASSED, TestStatus.FAILED] # Either is valid assert run.completed_at is not None assert run.operator == "test_user" # Step 5: Verify results were stored results = repository.get_results(run_id) assert len(results) > 0 # Should have TempCo result tempco_result = next((r for r in results if r.parameter == "temp_co"), None) assert tempco_result is not None assert tempco_result.unit == "ppm/C" assert tempco_result.lower_limit is not None assert tempco_result.upper_limit is not None # Step 6: Verify measurements were stored measurements_df = repository.get_measurements_dataframe(run_id) assert measurements_df is not None assert not measurements_df.empty # Should have v_out measurements v_out_measurements = measurements_df[measurements_df["parameter"] == "v_out"] assert len(v_out_measurements) >= 3 # At least one per temperature point # Step 7: Verify data integrity # All measurements should have valid values assert (measurements_df["value"] > 0).all() # All measurements should have units assert measurements_df["unit"].notna().all() # Timestamps should be monotonically increasing assert measurements_df["timestamp"].is_monotonic_increasing def test_e2e_server_lifecycle(simulation_server: ServerConfig) -> None: """Test simulation server lifecycle management. Verifies that: - Server starts successfully - Physics engine is running - Multiple instruments can connect - Server can be stopped cleanly """ # Server is already running from fixture (in background thread) # We verify it works by connecting instruments # Multiple instruments should be able to connect config = InstrumentConfig( backend="simulator", simulator_host=simulation_server.host, chamber_port=simulation_server.chamber_port, psu_port=simulation_server.psu_port, dmm_port=simulation_server.dmm_port, ) instruments1 = InstrumentFactory.create(config) instruments2 = InstrumentFactory.create(config) # Connect instruments instruments1.chamber.transport.connect() instruments2.chamber.transport.connect() # Both should work independently temp1 = instruments1.chamber.get_temperature() temp2 = instruments2.chamber.get_temperature() assert isinstance(temp1, float) assert isinstance(temp2, float) # Both should read similar values (same simulation) assert abs(temp1 - temp2) < 1.0 # Within 1 degree def test_e2e_instrument_hal_abstraction(simulation_server: ServerConfig) -> None: """Test Hardware Abstraction Layer works correctly. Verifies that: - Instruments implement HAL interfaces - Commands work through HAL - State changes propagate through physics """ config = InstrumentConfig( backend="simulator", simulator_host=simulation_server.host, chamber_port=simulation_server.chamber_port, psu_port=simulation_server.psu_port, dmm_port=simulation_server.dmm_port, ) instruments = InstrumentFactory.create(config) # Connect instruments instruments.chamber.transport.connect() instruments.psu.transport.connect() instruments.dmm.transport.connect() # Test thermal chamber HAL instruments.chamber.set_temperature(30.0) setpoint = instruments.chamber.get_setpoint() assert setpoint == 30.0 # Test power supply HAL instruments.psu.set_voltage(1, 5.0) instruments.psu.set_current_limit(1, 0.5) instruments.psu.enable_output(1, True) voltage_setpoint = instruments.psu.get_voltage(1) assert voltage_setpoint == 5.0 enabled = instruments.psu.is_output_enabled(1) assert enabled is True # Wait a moment for physics to update import time time.sleep(0.1) # Measure voltage with DMM measured_voltage = instruments.dmm.measure_dc_voltage() assert isinstance(measured_voltage, float) # Should be reading DUT output voltage (close to nominal) assert 3.0 < measured_voltage < 3.6 # LDO output def test_e2e_multiple_test_runs(simulation_server: ServerConfig) -> None: """Test running multiple tests sequentially. Verifies that: - Multiple tests can be run in sequence - Each test gets its own run ID - All results are stored correctly - Repository handles multiple runs """ with TemporaryDirectory() as tmpdir: config = InstrumentConfig( backend="simulator", simulator_host=simulation_server.host, chamber_port=simulation_server.chamber_port, psu_port=simulation_server.psu_port, dmm_port=simulation_server.dmm_port, ) instruments = InstrumentFactory.create(config) # Connect instruments instruments.chamber.transport.connect() instruments.psu.transport.connect() instruments.dmm.transport.connect() db_path = Path(tmpdir) / "test.db" repository = SQLiteRepository(str(db_path)) runner = TestRunner(repository) # Run same test twice with different configs test = TempCoTest() config1 = { "temperatures": [25.0], "input_voltage": 5.0, "load_current": 0.1, "settle_time": 0.5, } config2 = { "temperatures": [25.0], "input_voltage": 3.3, "load_current": 0.05, "settle_time": 0.5, } run_id1 = runner.run_test(test, instruments, config1, operator="test_user_1") run_id2 = runner.run_test(test, instruments, config2, operator="test_user_2") # Both runs should complete assert run_id1 != run_id2 run1 = repository.get_run(run_id1) run2 = repository.get_run(run_id2) assert run1.operator == "test_user_1" assert run2.operator == "test_user_2" # Both should have results results1 = repository.get_results(run_id1) results2 = repository.get_results(run_id2) assert len(results1) > 0 assert len(results2) > 0 # Verify get_all_runs works all_runs = repository.get_all_runs() assert len(all_runs) >= 2 assert any(r.id == str(run_id1) for r in all_runs) assert any(r.id == str(run_id2) for r in all_runs)