From ea6af3d77791a881107cf7d7ed678545b37fe480 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Tue, 9 Sep 2025 19:31:09 +0000 Subject: [PATCH] Implement test runner --- src/py_dvt_ate/framework/__init__.py | 2 + src/py_dvt_ate/framework/runner.py | 203 +++++++++++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 src/py_dvt_ate/framework/runner.py diff --git a/src/py_dvt_ate/framework/__init__.py b/src/py_dvt_ate/framework/__init__.py index 8f03f95..aae1c1e 100644 --- a/src/py_dvt_ate/framework/__init__.py +++ b/src/py_dvt_ate/framework/__init__.py @@ -7,6 +7,7 @@ and runtime context management for DVT characterisation tests. from py_dvt_ate.framework.context import ITest, TestContext from py_dvt_ate.framework.limits import Limit, LimitSet, check_value, evaluate_results from py_dvt_ate.framework.logger import ITestLogger, TestLogger +from py_dvt_ate.framework.runner import TestRunner __all__ = [ "ITest", @@ -15,6 +16,7 @@ __all__ = [ "LimitSet", "TestContext", "TestLogger", + "TestRunner", "check_value", "evaluate_results", ] diff --git a/src/py_dvt_ate/framework/runner.py b/src/py_dvt_ate/framework/runner.py new file mode 100644 index 0000000..ca9904f --- /dev/null +++ b/src/py_dvt_ate/framework/runner.py @@ -0,0 +1,203 @@ +"""Test runner for orchestrating DVT test execution. + +This module provides the TestRunner class, which coordinates test execution, +manages test lifecycle, and ensures proper logging and error handling. +""" + +import json +import traceback +from typing import Any +from uuid import UUID + +from py_dvt_ate.data.models import TestStatus +from py_dvt_ate.data.repository import ITestRepository +from py_dvt_ate.framework.context import ITest, TestContext +from py_dvt_ate.framework.limits import evaluate_results +from py_dvt_ate.framework.logger import TestLogger +from py_dvt_ate.instruments.factory import InstrumentSet + + +class TestRunner: + """Orchestrates DVT test execution. + + The test runner manages the complete test lifecycle: + 1. Creates a test run record in the repository + 2. Sets up logging and context + 3. Executes the test with proper error handling + 4. Evaluates results against limits + 5. Updates final status and flushes data + + Attributes: + repository: Data repository for persisting test results. + + Example: + runner = TestRunner(repository) + instruments = factory.create(config) + run_id = runner.run_test( + test=TempCoTest(), + instruments=instruments, + config={"temp_points": [-40, 25, 85]}, + operator="alice@example.com" + ) + """ + + def __init__(self, repository: ITestRepository): + """Initialise test runner. + + Args: + repository: Repository for persisting test data. + """ + self.repository = repository + + def run_test( + self, + test: ITest, + instruments: InstrumentSet, + config: dict[str, Any] | None = None, + operator: str | None = None, + description: str | None = None, + ) -> UUID: + """Run a DVT test with full lifecycle management. + + Creates a test run, executes the test with proper error handling, + evaluates results, and updates final status. All measurements and + results are persisted to the repository. + + Args: + test: Test instance to execute (implements ITest). + instruments: Instrument set for test to use. + config: Optional test-specific configuration dictionary. + operator: Optional operator identifier (e.g., email address). + description: Optional human-readable test run description. + + Returns: + UUID of the test run. Can be used to retrieve results later. + + Raises: + Exception: Only if repository operations fail. Test execution + errors are caught and recorded as ERROR status. + + Example: + run_id = runner.run_test( + test=TempCoTest(), + instruments=instruments, + config={"temp_points": [-40, 25, 85]}, + operator="alice@example.com", + description="Characterisation run #42" + ) + print(f"Test run ID: {run_id}") + """ + config = config or {} + + # Create test run record + run_id = self.repository.create_run( + test_name=test.name, + config=config, + operator=operator, + description=description or test.description, + ) + + # Create logger for this run + logger = TestLogger(run_id=run_id, repository=self.repository) + + # Create test context + context = TestContext( + run_id=run_id, + instruments=instruments, + logger=logger, + config=config, + ) + + # Update status to running + self.repository.update_run_status(run_id, TestStatus.RUNNING) + + # Execute test with error handling + try: + logger.log_event(f"Starting test: {test.name}", level="INFO") + logger.log_event(f"Description: {test.description}", level="INFO") + + # Log configuration + if config: + config_str = json.dumps(config, indent=2) + logger.log_event(f"Configuration:\n{config_str}", level="DEBUG") + + # Execute the test + status = test.execute(context) + + # Flush any buffered measurements + logger.flush() + + # Evaluate results if test didn't explicitly set status + if status == TestStatus.RUNNING: + results = self.repository.get_results(run_id) + status = evaluate_results(results) + logger.log_event( + f"Test completed. Evaluated {len(results)} results: {status.value}", + level="INFO", + ) + + # Update final status + self.repository.complete_run(run_id, status) + logger.log_event(f"Test finished with status: {status.value}", level="INFO") + + except KeyboardInterrupt: + # User interrupted - mark as error but don't swallow interrupt + logger.log_event("Test interrupted by user", level="WARNING") + logger.flush() + self.repository.complete_run(run_id, TestStatus.ERROR) + raise + + except Exception as e: + # Test execution error - log and mark as ERROR + error_msg = f"Test execution failed: {e}" + logger.log_event(error_msg, level="ERROR") + logger.log_event(traceback.format_exc(), level="DEBUG") + logger.flush() + self.repository.complete_run(run_id, TestStatus.ERROR) + logger.log_event("Test finished with status: ERROR", level="INFO") + + return run_id + + def run_tests( + self, + tests: list[ITest], + instruments: InstrumentSet, + config: dict[str, Any] | None = None, + operator: str | None = None, + ) -> list[UUID]: + """Run multiple tests sequentially. + + Convenience method for running a suite of tests. Each test is run + independently with its own test run record. If one test fails, the + remaining tests still execute. + + Args: + tests: List of test instances to execute. + instruments: Instrument set shared by all tests. + config: Optional configuration applied to all tests. + operator: Optional operator identifier. + + Returns: + List of test run UUIDs in execution order. + + Example: + run_ids = runner.run_tests( + tests=[TempCoTest(), LoadRegTest(), LineRegTest()], + instruments=instruments, + config={"common_setting": 42}, + operator="alice@example.com" + ) + for run_id in run_ids: + run = repository.get_run(run_id) + print(f"{run.test_name}: {run.status.value}") + """ + run_ids = [] + for test in tests: + run_id = self.run_test( + test=test, + instruments=instruments, + config=config, + operator=operator, + ) + run_ids.append(run_id) + return run_ids