Implement test runner

This commit is contained in:
2025-12-02 23:56:05 +00:00
parent 7dee677597
commit f8ef2e3cf7
2 changed files with 205 additions and 0 deletions

View File

@@ -7,6 +7,7 @@ and runtime context management for DVT characterisation tests.
from py_dvt_ate.framework.context import ITest, TestContext
from py_dvt_ate.framework.limits import Limit, LimitSet, check_value, evaluate_results
from py_dvt_ate.framework.logger import ITestLogger, TestLogger
from py_dvt_ate.framework.runner import TestRunner
__all__ = [
"ITest",
@@ -15,6 +16,7 @@ __all__ = [
"LimitSet",
"TestContext",
"TestLogger",
"TestRunner",
"check_value",
"evaluate_results",
]

View File

@@ -0,0 +1,203 @@
"""Test runner for orchestrating DVT test execution.
This module provides the TestRunner class, which coordinates test execution,
manages test lifecycle, and ensures proper logging and error handling.
"""
import json
import traceback
from typing import Any
from uuid import UUID
from py_dvt_ate.data.models import TestStatus
from py_dvt_ate.data.repository import ITestRepository
from py_dvt_ate.framework.context import ITest, TestContext
from py_dvt_ate.framework.limits import evaluate_results
from py_dvt_ate.framework.logger import TestLogger
from py_dvt_ate.instruments.factory import InstrumentSet
class TestRunner:
"""Orchestrates DVT test execution.
The test runner manages the complete test lifecycle:
1. Creates a test run record in the repository
2. Sets up logging and context
3. Executes the test with proper error handling
4. Evaluates results against limits
5. Updates final status and flushes data
Attributes:
repository: Data repository for persisting test results.
Example:
runner = TestRunner(repository)
instruments = factory.create(config)
run_id = runner.run_test(
test=TempCoTest(),
instruments=instruments,
config={"temp_points": [-40, 25, 85]},
operator="alice@example.com"
)
"""
def __init__(self, repository: ITestRepository):
"""Initialise test runner.
Args:
repository: Repository for persisting test data.
"""
self.repository = repository
def run_test(
self,
test: ITest,
instruments: InstrumentSet,
config: dict[str, Any] | None = None,
operator: str | None = None,
description: str | None = None,
) -> UUID:
"""Run a DVT test with full lifecycle management.
Creates a test run, executes the test with proper error handling,
evaluates results, and updates final status. All measurements and
results are persisted to the repository.
Args:
test: Test instance to execute (implements ITest).
instruments: Instrument set for test to use.
config: Optional test-specific configuration dictionary.
operator: Optional operator identifier (e.g., email address).
description: Optional human-readable test run description.
Returns:
UUID of the test run. Can be used to retrieve results later.
Raises:
Exception: Only if repository operations fail. Test execution
errors are caught and recorded as ERROR status.
Example:
run_id = runner.run_test(
test=TempCoTest(),
instruments=instruments,
config={"temp_points": [-40, 25, 85]},
operator="alice@example.com",
description="Characterisation run #42"
)
print(f"Test run ID: {run_id}")
"""
config = config or {}
# Create test run record
run_id = self.repository.create_run(
test_name=test.name,
config=config,
operator=operator,
description=description or test.description,
)
# Create logger for this run
logger = TestLogger(run_id=run_id, repository=self.repository)
# Create test context
context = TestContext(
run_id=run_id,
instruments=instruments,
logger=logger,
config=config,
)
# Update status to running
self.repository.update_run_status(run_id, TestStatus.RUNNING)
# Execute test with error handling
try:
logger.log_event(f"Starting test: {test.name}", level="INFO")
logger.log_event(f"Description: {test.description}", level="INFO")
# Log configuration
if config:
config_str = json.dumps(config, indent=2)
logger.log_event(f"Configuration:\n{config_str}", level="DEBUG")
# Execute the test
status = test.execute(context)
# Flush any buffered measurements
logger.flush()
# Evaluate results if test didn't explicitly set status
if status == TestStatus.RUNNING:
results = self.repository.get_results(run_id)
status = evaluate_results(results)
logger.log_event(
f"Test completed. Evaluated {len(results)} results: {status.value}",
level="INFO",
)
# Update final status
self.repository.complete_run(run_id, status)
logger.log_event(f"Test finished with status: {status.value}", level="INFO")
except KeyboardInterrupt:
# User interrupted - mark as error but don't swallow interrupt
logger.log_event("Test interrupted by user", level="WARNING")
logger.flush()
self.repository.complete_run(run_id, TestStatus.ERROR)
raise
except Exception as e:
# Test execution error - log and mark as ERROR
error_msg = f"Test execution failed: {e}"
logger.log_event(error_msg, level="ERROR")
logger.log_event(traceback.format_exc(), level="DEBUG")
logger.flush()
self.repository.complete_run(run_id, TestStatus.ERROR)
logger.log_event("Test finished with status: ERROR", level="INFO")
return run_id
def run_tests(
self,
tests: list[ITest],
instruments: InstrumentSet,
config: dict[str, Any] | None = None,
operator: str | None = None,
) -> list[UUID]:
"""Run multiple tests sequentially.
Convenience method for running a suite of tests. Each test is run
independently with its own test run record. If one test fails, the
remaining tests still execute.
Args:
tests: List of test instances to execute.
instruments: Instrument set shared by all tests.
config: Optional configuration applied to all tests.
operator: Optional operator identifier.
Returns:
List of test run UUIDs in execution order.
Example:
run_ids = runner.run_tests(
tests=[TempCoTest(), LoadRegTest(), LineRegTest()],
instruments=instruments,
config={"common_setting": 42},
operator="alice@example.com"
)
for run_id in run_ids:
run = repository.get_run(run_id)
print(f"{run.test_name}: {run.status.value}")
"""
run_ids = []
for test in tests:
run_id = self.run_test(
test=test,
instruments=instruments,
config=config,
operator=operator,
)
run_ids.append(run_id)
return run_ids