- Wrap simulation controls in form to prevent page reruns on change - Fix TempCo test configs to use 2+ temperature points - Add Installation, Quick Start, and usage examples to README 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
290 lines
9.9 KiB
Python
290 lines
9.9 KiB
Python
"""End-to-end integration tests for py_dvt_ate.
|
|
|
|
This module contains comprehensive tests that exercise the entire system:
|
|
- Simulation server startup
|
|
- Instrument connectivity via HAL
|
|
- Test execution through the framework
|
|
- Data persistence
|
|
- Results retrieval
|
|
|
|
These tests verify that all components work together correctly in a
|
|
complete workflow from server start to results analysis.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from tempfile import TemporaryDirectory
|
|
|
|
from py_dvt_ate.data.models import TestStatus
|
|
from py_dvt_ate.data.repository import SQLiteRepository
|
|
from py_dvt_ate.framework.runner import TestRunner
|
|
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
|
|
from py_dvt_ate.simulation.server import ServerConfig
|
|
from py_dvt_ate.tests.thermal.tempco import TempCoTest
|
|
|
|
|
|
def test_e2e_tempco_characterization(simulation_server: ServerConfig) -> None:
|
|
"""End-to-end test: Run complete TempCo characterization workflow.
|
|
|
|
This test exercises the entire system:
|
|
1. Simulation server is running (from fixture)
|
|
2. Create instruments via HAL factory
|
|
3. Create test repository and runner
|
|
4. Execute TempCo test
|
|
5. Verify results are persisted
|
|
6. Verify measurements are stored
|
|
7. Retrieve and analyze results
|
|
|
|
This is the closest test to real-world usage, verifying that all
|
|
components integrate correctly.
|
|
"""
|
|
with TemporaryDirectory() as tmpdir:
|
|
# Step 1: Create instruments via HAL
|
|
instrument_config = InstrumentConfig(
|
|
backend="simulator",
|
|
simulator_host=simulation_server.host,
|
|
chamber_port=simulation_server.chamber_port,
|
|
psu_port=simulation_server.psu_port,
|
|
dmm_port=simulation_server.dmm_port,
|
|
)
|
|
|
|
instruments = InstrumentFactory.create(instrument_config)
|
|
|
|
# Connect to instruments
|
|
instruments.chamber.transport.connect()
|
|
instruments.psu.transport.connect()
|
|
instruments.dmm.transport.connect()
|
|
|
|
# Verify instrument connectivity
|
|
idn = instruments.chamber.get_temperature() # Should not raise
|
|
assert isinstance(idn, float)
|
|
|
|
# Step 2: Create repository and test runner
|
|
db_path = Path(tmpdir) / "test.db"
|
|
repository = SQLiteRepository(str(db_path))
|
|
runner = TestRunner(repository)
|
|
|
|
# Step 3: Execute TempCo test with minimal config for speed
|
|
test = TempCoTest()
|
|
config = {
|
|
"temperatures": [0.0, 25.0, 50.0], # Reduced for test speed
|
|
"input_voltage": 5.0,
|
|
"load_current": 0.1,
|
|
"settle_time": 0.5, # Reduced for test speed
|
|
"num_samples": 3, # Reduced for test speed
|
|
"tempco_limit": 100.0, # Relaxed for test
|
|
}
|
|
|
|
run_id = runner.run_test(
|
|
test=test,
|
|
instruments=instruments,
|
|
config=config,
|
|
operator="test_user",
|
|
description="E2E integration test",
|
|
)
|
|
|
|
# Step 4: Verify run was created
|
|
assert run_id is not None
|
|
run = repository.get_run(run_id)
|
|
assert run.test_name == "tempco"
|
|
assert run.status in [TestStatus.PASSED, TestStatus.FAILED] # Either is valid
|
|
assert run.completed_at is not None
|
|
assert run.operator == "test_user"
|
|
|
|
# Step 5: Verify results were stored
|
|
results = repository.get_results(run_id)
|
|
assert len(results) > 0
|
|
|
|
# Should have TempCo result
|
|
tempco_result = next((r for r in results if r.parameter == "temp_co"), None)
|
|
assert tempco_result is not None
|
|
assert tempco_result.unit == "ppm/C"
|
|
assert tempco_result.lower_limit is not None
|
|
assert tempco_result.upper_limit is not None
|
|
|
|
# Step 6: Verify measurements were stored
|
|
measurements_df = repository.get_measurements_dataframe(run_id)
|
|
assert measurements_df is not None
|
|
assert not measurements_df.empty
|
|
|
|
# Should have v_out measurements
|
|
v_out_measurements = measurements_df[measurements_df["parameter"] == "v_out"]
|
|
assert len(v_out_measurements) >= 3 # At least one per temperature point
|
|
|
|
# Step 7: Verify data integrity
|
|
# All measurements should have valid values
|
|
assert (measurements_df["value"] > 0).all()
|
|
# All measurements should have units
|
|
assert measurements_df["unit"].notna().all()
|
|
# Timestamps should be monotonically increasing
|
|
assert measurements_df["timestamp"].is_monotonic_increasing
|
|
|
|
# Cleanup: close repository before tempdir cleanup (Windows file locking)
|
|
repository.close()
|
|
|
|
|
|
def test_e2e_server_lifecycle(simulation_server: ServerConfig) -> None:
|
|
"""Test simulation server lifecycle management.
|
|
|
|
Verifies that:
|
|
- Server starts successfully
|
|
- Physics engine is running
|
|
- Multiple instruments can connect
|
|
- Server can be stopped cleanly
|
|
"""
|
|
# Server is already running from fixture (in background thread)
|
|
# We verify it works by connecting instruments
|
|
|
|
# Multiple instruments should be able to connect
|
|
config = InstrumentConfig(
|
|
backend="simulator",
|
|
simulator_host=simulation_server.host,
|
|
chamber_port=simulation_server.chamber_port,
|
|
psu_port=simulation_server.psu_port,
|
|
dmm_port=simulation_server.dmm_port,
|
|
)
|
|
|
|
instruments1 = InstrumentFactory.create(config)
|
|
instruments2 = InstrumentFactory.create(config)
|
|
|
|
# Connect instruments
|
|
instruments1.chamber.transport.connect()
|
|
instruments2.chamber.transport.connect()
|
|
|
|
# Both should work independently
|
|
temp1 = instruments1.chamber.get_temperature()
|
|
temp2 = instruments2.chamber.get_temperature()
|
|
|
|
assert isinstance(temp1, float)
|
|
assert isinstance(temp2, float)
|
|
# Both should read similar values (same simulation)
|
|
assert abs(temp1 - temp2) < 1.0 # Within 1 degree
|
|
|
|
|
|
def test_e2e_instrument_hal_abstraction(simulation_server: ServerConfig) -> None:
|
|
"""Test Hardware Abstraction Layer works correctly.
|
|
|
|
Verifies that:
|
|
- Instruments implement HAL interfaces
|
|
- Commands work through HAL
|
|
- State changes propagate through physics
|
|
"""
|
|
config = InstrumentConfig(
|
|
backend="simulator",
|
|
simulator_host=simulation_server.host,
|
|
chamber_port=simulation_server.chamber_port,
|
|
psu_port=simulation_server.psu_port,
|
|
dmm_port=simulation_server.dmm_port,
|
|
)
|
|
|
|
instruments = InstrumentFactory.create(config)
|
|
|
|
# Connect instruments
|
|
instruments.chamber.transport.connect()
|
|
instruments.psu.transport.connect()
|
|
instruments.dmm.transport.connect()
|
|
|
|
# Test thermal chamber HAL
|
|
instruments.chamber.set_temperature(30.0)
|
|
setpoint = instruments.chamber.get_setpoint()
|
|
assert setpoint == 30.0
|
|
|
|
# Test power supply HAL
|
|
instruments.psu.set_voltage(1, 5.0)
|
|
instruments.psu.set_current_limit(1, 0.5)
|
|
instruments.psu.enable_output(1, True)
|
|
|
|
voltage_setpoint = instruments.psu.get_voltage(1)
|
|
assert voltage_setpoint == 5.0
|
|
|
|
enabled = instruments.psu.is_output_enabled(1)
|
|
assert enabled is True
|
|
|
|
# Wait a moment for physics to update
|
|
import time
|
|
time.sleep(0.1)
|
|
|
|
# Measure voltage with DMM
|
|
measured_voltage = instruments.dmm.measure_dc_voltage()
|
|
assert isinstance(measured_voltage, float)
|
|
# Should be reading DUT output voltage (close to nominal)
|
|
assert 3.0 < measured_voltage < 3.6 # LDO output
|
|
|
|
|
|
def test_e2e_multiple_test_runs(simulation_server: ServerConfig) -> None:
|
|
"""Test running multiple tests sequentially.
|
|
|
|
Verifies that:
|
|
- Multiple tests can be run in sequence
|
|
- Each test gets its own run ID
|
|
- All results are stored correctly
|
|
- Repository handles multiple runs
|
|
"""
|
|
with TemporaryDirectory() as tmpdir:
|
|
config = InstrumentConfig(
|
|
backend="simulator",
|
|
simulator_host=simulation_server.host,
|
|
chamber_port=simulation_server.chamber_port,
|
|
psu_port=simulation_server.psu_port,
|
|
dmm_port=simulation_server.dmm_port,
|
|
)
|
|
|
|
instruments = InstrumentFactory.create(config)
|
|
|
|
# Connect instruments
|
|
instruments.chamber.transport.connect()
|
|
instruments.psu.transport.connect()
|
|
instruments.dmm.transport.connect()
|
|
|
|
db_path = Path(tmpdir) / "test.db"
|
|
repository = SQLiteRepository(str(db_path))
|
|
runner = TestRunner(repository)
|
|
|
|
# Run same test twice with different configs
|
|
test = TempCoTest()
|
|
|
|
config1 = {
|
|
"temperatures": [0.0, 25.0], # Need at least 2 points for TempCo
|
|
"input_voltage": 5.0,
|
|
"load_current": 0.1,
|
|
"settle_time": 0.5,
|
|
"num_samples": 3,
|
|
"tempco_limit": 100.0,
|
|
}
|
|
|
|
config2 = {
|
|
"temperatures": [25.0, 50.0], # Need at least 2 points for TempCo
|
|
"input_voltage": 3.3,
|
|
"load_current": 0.05,
|
|
"settle_time": 0.5,
|
|
"num_samples": 3,
|
|
"tempco_limit": 100.0,
|
|
}
|
|
|
|
run_id1 = runner.run_test(test, instruments, config1, operator="test_user_1")
|
|
run_id2 = runner.run_test(test, instruments, config2, operator="test_user_2")
|
|
|
|
# Both runs should complete
|
|
assert run_id1 != run_id2
|
|
|
|
run1 = repository.get_run(run_id1)
|
|
run2 = repository.get_run(run_id2)
|
|
|
|
assert run1.operator == "test_user_1"
|
|
assert run2.operator == "test_user_2"
|
|
|
|
# Both should have results
|
|
results1 = repository.get_results(run_id1)
|
|
results2 = repository.get_results(run_id2)
|
|
|
|
assert len(results1) > 0
|
|
assert len(results2) > 0
|
|
|
|
# Verify get_all_runs works
|
|
all_runs = repository.get_all_runs()
|
|
assert len(all_runs) >= 2
|
|
assert any(r.id == str(run_id1) for r in all_runs)
|
|
assert any(r.id == str(run_id2) for r in all_runs)
|
|
|
|
# Cleanup: close repository before tempdir cleanup (Windows file locking)
|
|
repository.close()
|