"""Test runner for orchestrating DVT test execution. This module provides the TestRunner class, which coordinates test execution, manages test lifecycle, and ensures proper logging and error handling. """ import json import traceback from typing import Any from uuid import UUID from py_dvt_ate.data.models import TestStatus from py_dvt_ate.data.repository import ITestRepository from py_dvt_ate.framework.context import ITest, TestContext from py_dvt_ate.framework.limits import evaluate_results from py_dvt_ate.framework.logger import TestLogger from py_dvt_ate.instruments.factory import InstrumentSet class TestRunner: """Orchestrates DVT test execution. The test runner manages the complete test lifecycle: 1. Creates a test run record in the repository 2. Sets up logging and context 3. Executes the test with proper error handling 4. Evaluates results against limits 5. Updates final status and flushes data Attributes: repository: Data repository for persisting test results. Example: runner = TestRunner(repository) instruments = factory.create(config) run_id = runner.run_test( test=TempCoTest(), instruments=instruments, config={"temp_points": [-40, 25, 85]}, operator="alice@example.com" ) """ def __init__(self, repository: ITestRepository): """Initialise test runner. Args: repository: Repository for persisting test data. """ self.repository = repository def run_test( self, test: ITest, instruments: InstrumentSet, config: dict[str, Any] | None = None, operator: str | None = None, description: str | None = None, ) -> UUID: """Run a DVT test with full lifecycle management. Creates a test run, executes the test with proper error handling, evaluates results, and updates final status. All measurements and results are persisted to the repository. Args: test: Test instance to execute (implements ITest). instruments: Instrument set for test to use. config: Optional test-specific configuration dictionary. operator: Optional operator identifier (e.g., email address). description: Optional human-readable test run description. Returns: UUID of the test run. Can be used to retrieve results later. Raises: Exception: Only if repository operations fail. Test execution errors are caught and recorded as ERROR status. Example: run_id = runner.run_test( test=TempCoTest(), instruments=instruments, config={"temp_points": [-40, 25, 85]}, operator="alice@example.com", description="Characterisation run #42" ) print(f"Test run ID: {run_id}") """ config = config or {} # Create test run record run_id = self.repository.create_run( test_name=test.name, config=config, operator=operator, description=description or test.description, ) # Create logger for this run logger = TestLogger(run_id=run_id, repository=self.repository) # Create test context context = TestContext( run_id=run_id, instruments=instruments, logger=logger, config=config, ) # Update status to running self.repository.update_run_status(run_id, TestStatus.RUNNING) # Execute test with error handling try: logger.log_event(f"Starting test: {test.name}", level="INFO") logger.log_event(f"Description: {test.description}", level="INFO") # Log configuration if config: config_str = json.dumps(config, indent=2) logger.log_event(f"Configuration:\n{config_str}", level="DEBUG") # Execute the test status = test.execute(context) # Flush any buffered measurements logger.flush() # Evaluate results if test didn't explicitly set status if status == TestStatus.RUNNING: results = self.repository.get_results(run_id) status = evaluate_results(results) logger.log_event( f"Test completed. Evaluated {len(results)} results: {status.value}", level="INFO", ) # Update final status self.repository.complete_run(run_id, status) logger.log_event(f"Test finished with status: {status.value}", level="INFO") except KeyboardInterrupt: # User interrupted - mark as error but don't swallow interrupt logger.log_event("Test interrupted by user", level="WARNING") logger.flush() self.repository.complete_run(run_id, TestStatus.ERROR) raise except Exception as e: # Test execution error - log and mark as ERROR error_msg = f"Test execution failed: {e}" logger.log_event(error_msg, level="ERROR") logger.log_event(traceback.format_exc(), level="DEBUG") logger.flush() self.repository.complete_run(run_id, TestStatus.ERROR) logger.log_event("Test finished with status: ERROR", level="INFO") return run_id def run_tests( self, tests: list[ITest], instruments: InstrumentSet, config: dict[str, Any] | None = None, operator: str | None = None, ) -> list[UUID]: """Run multiple tests sequentially. Convenience method for running a suite of tests. Each test is run independently with its own test run record. If one test fails, the remaining tests still execute. Args: tests: List of test instances to execute. instruments: Instrument set shared by all tests. config: Optional configuration applied to all tests. operator: Optional operator identifier. Returns: List of test run UUIDs in execution order. Example: run_ids = runner.run_tests( tests=[TempCoTest(), LoadRegTest(), LineRegTest()], instruments=instruments, config={"common_setting": 42}, operator="alice@example.com" ) for run_id in run_ids: run = repository.get_run(run_id) print(f"{run.test_name}: {run.status.value}") """ run_ids = [] for test in tests: run_id = self.run_test( test=test, instruments=instruments, config=config, operator=operator, ) run_ids.append(run_id) return run_ids