Files
py-dvt-ate/tests/integration/test_e2e.py
Kai Chappell 416969e899 Polish dashboard UX and update README
- Wrap simulation controls in form to prevent page reruns on change
- Fix TempCo test configs to use 2+ temperature points
- Add Installation, Quick Start, and usage examples to README

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 17:42:07 +00:00

290 lines
9.9 KiB
Python

"""End-to-end integration tests for py_dvt_ate.
This module contains comprehensive tests that exercise the entire system:
- Simulation server startup
- Instrument connectivity via HAL
- Test execution through the framework
- Data persistence
- Results retrieval
These tests verify that all components work together correctly in a
complete workflow from server start to results analysis.
"""
from pathlib import Path
from tempfile import TemporaryDirectory
from py_dvt_ate.data.models import TestStatus
from py_dvt_ate.data.repository import SQLiteRepository
from py_dvt_ate.framework.runner import TestRunner
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
from py_dvt_ate.simulation.server import ServerConfig
from py_dvt_ate.tests.thermal.tempco import TempCoTest
def test_e2e_tempco_characterization(simulation_server: ServerConfig) -> None:
"""End-to-end test: Run complete TempCo characterization workflow.
This test exercises the entire system:
1. Simulation server is running (from fixture)
2. Create instruments via HAL factory
3. Create test repository and runner
4. Execute TempCo test
5. Verify results are persisted
6. Verify measurements are stored
7. Retrieve and analyze results
This is the closest test to real-world usage, verifying that all
components integrate correctly.
"""
with TemporaryDirectory() as tmpdir:
# Step 1: Create instruments via HAL
instrument_config = InstrumentConfig(
backend="simulator",
simulator_host=simulation_server.host,
chamber_port=simulation_server.chamber_port,
psu_port=simulation_server.psu_port,
dmm_port=simulation_server.dmm_port,
)
instruments = InstrumentFactory.create(instrument_config)
# Connect to instruments
instruments.chamber.transport.connect()
instruments.psu.transport.connect()
instruments.dmm.transport.connect()
# Verify instrument connectivity
idn = instruments.chamber.get_temperature() # Should not raise
assert isinstance(idn, float)
# Step 2: Create repository and test runner
db_path = Path(tmpdir) / "test.db"
repository = SQLiteRepository(str(db_path))
runner = TestRunner(repository)
# Step 3: Execute TempCo test with minimal config for speed
test = TempCoTest()
config = {
"temperatures": [0.0, 25.0, 50.0], # Reduced for test speed
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5, # Reduced for test speed
"num_samples": 3, # Reduced for test speed
"tempco_limit": 100.0, # Relaxed for test
}
run_id = runner.run_test(
test=test,
instruments=instruments,
config=config,
operator="test_user",
description="E2E integration test",
)
# Step 4: Verify run was created
assert run_id is not None
run = repository.get_run(run_id)
assert run.test_name == "tempco"
assert run.status in [TestStatus.PASSED, TestStatus.FAILED] # Either is valid
assert run.completed_at is not None
assert run.operator == "test_user"
# Step 5: Verify results were stored
results = repository.get_results(run_id)
assert len(results) > 0
# Should have TempCo result
tempco_result = next((r for r in results if r.parameter == "temp_co"), None)
assert tempco_result is not None
assert tempco_result.unit == "ppm/C"
assert tempco_result.lower_limit is not None
assert tempco_result.upper_limit is not None
# Step 6: Verify measurements were stored
measurements_df = repository.get_measurements_dataframe(run_id)
assert measurements_df is not None
assert not measurements_df.empty
# Should have v_out measurements
v_out_measurements = measurements_df[measurements_df["parameter"] == "v_out"]
assert len(v_out_measurements) >= 3 # At least one per temperature point
# Step 7: Verify data integrity
# All measurements should have valid values
assert (measurements_df["value"] > 0).all()
# All measurements should have units
assert measurements_df["unit"].notna().all()
# Timestamps should be monotonically increasing
assert measurements_df["timestamp"].is_monotonic_increasing
# Cleanup: close repository before tempdir cleanup (Windows file locking)
repository.close()
def test_e2e_server_lifecycle(simulation_server: ServerConfig) -> None:
"""Test simulation server lifecycle management.
Verifies that:
- Server starts successfully
- Physics engine is running
- Multiple instruments can connect
- Server can be stopped cleanly
"""
# Server is already running from fixture (in background thread)
# We verify it works by connecting instruments
# Multiple instruments should be able to connect
config = InstrumentConfig(
backend="simulator",
simulator_host=simulation_server.host,
chamber_port=simulation_server.chamber_port,
psu_port=simulation_server.psu_port,
dmm_port=simulation_server.dmm_port,
)
instruments1 = InstrumentFactory.create(config)
instruments2 = InstrumentFactory.create(config)
# Connect instruments
instruments1.chamber.transport.connect()
instruments2.chamber.transport.connect()
# Both should work independently
temp1 = instruments1.chamber.get_temperature()
temp2 = instruments2.chamber.get_temperature()
assert isinstance(temp1, float)
assert isinstance(temp2, float)
# Both should read similar values (same simulation)
assert abs(temp1 - temp2) < 1.0 # Within 1 degree
def test_e2e_instrument_hal_abstraction(simulation_server: ServerConfig) -> None:
"""Test Hardware Abstraction Layer works correctly.
Verifies that:
- Instruments implement HAL interfaces
- Commands work through HAL
- State changes propagate through physics
"""
config = InstrumentConfig(
backend="simulator",
simulator_host=simulation_server.host,
chamber_port=simulation_server.chamber_port,
psu_port=simulation_server.psu_port,
dmm_port=simulation_server.dmm_port,
)
instruments = InstrumentFactory.create(config)
# Connect instruments
instruments.chamber.transport.connect()
instruments.psu.transport.connect()
instruments.dmm.transport.connect()
# Test thermal chamber HAL
instruments.chamber.set_temperature(30.0)
setpoint = instruments.chamber.get_setpoint()
assert setpoint == 30.0
# Test power supply HAL
instruments.psu.set_voltage(1, 5.0)
instruments.psu.set_current_limit(1, 0.5)
instruments.psu.enable_output(1, True)
voltage_setpoint = instruments.psu.get_voltage(1)
assert voltage_setpoint == 5.0
enabled = instruments.psu.is_output_enabled(1)
assert enabled is True
# Wait a moment for physics to update
import time
time.sleep(0.1)
# Measure voltage with DMM
measured_voltage = instruments.dmm.measure_dc_voltage()
assert isinstance(measured_voltage, float)
# Should be reading DUT output voltage (close to nominal)
assert 3.0 < measured_voltage < 3.6 # LDO output
def test_e2e_multiple_test_runs(simulation_server: ServerConfig) -> None:
"""Test running multiple tests sequentially.
Verifies that:
- Multiple tests can be run in sequence
- Each test gets its own run ID
- All results are stored correctly
- Repository handles multiple runs
"""
with TemporaryDirectory() as tmpdir:
config = InstrumentConfig(
backend="simulator",
simulator_host=simulation_server.host,
chamber_port=simulation_server.chamber_port,
psu_port=simulation_server.psu_port,
dmm_port=simulation_server.dmm_port,
)
instruments = InstrumentFactory.create(config)
# Connect instruments
instruments.chamber.transport.connect()
instruments.psu.transport.connect()
instruments.dmm.transport.connect()
db_path = Path(tmpdir) / "test.db"
repository = SQLiteRepository(str(db_path))
runner = TestRunner(repository)
# Run same test twice with different configs
test = TempCoTest()
config1 = {
"temperatures": [0.0, 25.0], # Need at least 2 points for TempCo
"input_voltage": 5.0,
"load_current": 0.1,
"settle_time": 0.5,
"num_samples": 3,
"tempco_limit": 100.0,
}
config2 = {
"temperatures": [25.0, 50.0], # Need at least 2 points for TempCo
"input_voltage": 3.3,
"load_current": 0.05,
"settle_time": 0.5,
"num_samples": 3,
"tempco_limit": 100.0,
}
run_id1 = runner.run_test(test, instruments, config1, operator="test_user_1")
run_id2 = runner.run_test(test, instruments, config2, operator="test_user_2")
# Both runs should complete
assert run_id1 != run_id2
run1 = repository.get_run(run_id1)
run2 = repository.get_run(run_id2)
assert run1.operator == "test_user_1"
assert run2.operator == "test_user_2"
# Both should have results
results1 = repository.get_results(run_id1)
results2 = repository.get_results(run_id2)
assert len(results1) > 0
assert len(results2) > 0
# Verify get_all_runs works
all_runs = repository.get_all_runs()
assert len(all_runs) >= 2
assert any(r.id == str(run_id1) for r in all_runs)
assert any(r.id == str(run_id2) for r in all_runs)
# Cleanup: close repository before tempdir cleanup (Windows file locking)
repository.close()