Files
py-dvt-ate/src/py_dvt_ate/framework/logger.py
2025-12-02 23:52:33 +00:00

223 lines
7.2 KiB
Python

"""Test logger for recording measurements and events.
This module provides the logging infrastructure for DVT tests. The test logger
records time-series measurements, scalar results with limits, and event messages
during test execution.
"""
import time
from abc import ABC, abstractmethod
from datetime import datetime
from uuid import UUID
from py_dvt_ate.data.models import Measurement
from py_dvt_ate.data.repository import ITestRepository
class ITestLogger(ABC):
"""Abstract interface for test data logging.
Provides methods for logging measurements, results, and events during
test execution. Implementations are responsible for persisting this
data to the appropriate storage backend.
"""
@abstractmethod
def log_measurement(
self,
parameter: str,
value: float,
unit: str,
conditions: dict[str, float] | None = None,
) -> None:
"""Log a time-series measurement with environmental conditions.
Used for logging raw measurements taken during the test. These are
stored as time-series data for later analysis and plotting.
Args:
parameter: Measurement parameter name (e.g., "v_out", "i_q").
value: Measured value.
unit: Unit of measurement (e.g., "V", "A", "°C").
conditions: Optional environmental conditions at time of measurement:
- "temperature": Chamber temperature (°C)
- "input_voltage": DUT input voltage (V)
- "load_current": DUT load current (A)
Example:
logger.log_measurement(
"v_out", 3.301, "V",
conditions={"temperature": 25.0, "input_voltage": 5.0}
)
"""
pass
@abstractmethod
def log_result(
self,
parameter: str,
value: float,
unit: str,
lower_limit: float | None = None,
upper_limit: float | None = None,
) -> None:
"""Log a scalar test result with pass/fail limits.
Used for logging calculated or derived results that will be evaluated
against specification limits. These appear in test reports and determine
overall pass/fail status.
Args:
parameter: Result parameter name (e.g., "temp_co", "load_reg").
value: Calculated or measured value.
unit: Unit of measurement (e.g., "ppm/°C", "%", "mV").
lower_limit: Optional lower limit for pass/fail evaluation.
upper_limit: Optional upper limit for pass/fail evaluation.
Example:
logger.log_result(
"temp_co", 23.5, "ppm/°C",
lower_limit=-50.0, upper_limit=50.0
)
"""
pass
@abstractmethod
def log_event(self, message: str, level: str = "INFO") -> None:
"""Log a test event or message.
Used for logging informational messages, warnings, and errors during
test execution. Useful for debugging and understanding test flow.
Args:
message: Event message text.
level: Log level ("DEBUG", "INFO", "WARNING", "ERROR").
Example:
logger.log_event("Waiting for thermal stability", level="INFO")
"""
pass
@abstractmethod
def flush(self) -> None:
"""Flush any buffered data to storage.
Forces any buffered measurements or results to be written to the
underlying storage backend. Called automatically at end of test,
but can be called manually for long-running tests.
"""
pass
class TestLogger(ITestLogger):
"""Concrete test logger implementation using repository pattern.
Buffers measurements in memory and writes them in batches to a
repository for efficiency. Results and events are written immediately.
Attributes:
run_id: UUID of the test run this logger is associated with.
repository: Data repository for persisting measurements and results.
measurement_buffer: In-memory buffer of measurements awaiting write.
buffer_size: Number of measurements to buffer before auto-flush.
"""
def __init__(
self,
run_id: UUID,
repository: ITestRepository,
buffer_size: int = 100,
):
"""Initialise test logger.
Args:
run_id: UUID of the test run to associate logs with.
repository: Repository for persisting data.
buffer_size: Number of measurements to buffer before auto-flush.
Default 100 provides good balance of performance
and memory usage.
"""
self.run_id = run_id
self.repository = repository
self.buffer_size = buffer_size
self.measurement_buffer: list[Measurement] = []
def log_measurement(
self,
parameter: str,
value: float,
unit: str,
conditions: dict[str, float] | None = None,
) -> None:
"""Log a time-series measurement with environmental conditions.
Measurements are buffered in memory and written to the repository
in batches for efficiency.
"""
conditions = conditions or {}
measurement = Measurement(
timestamp=time.time(),
parameter=parameter,
value=value,
unit=unit,
temperature=conditions.get("temperature", 0.0),
input_voltage=conditions.get("input_voltage", 0.0),
load_current=conditions.get("load_current", 0.0),
)
self.measurement_buffer.append(measurement)
# Auto-flush when buffer is full
if len(self.measurement_buffer) >= self.buffer_size:
self.flush()
def log_result(
self,
parameter: str,
value: float,
unit: str,
lower_limit: float | None = None,
upper_limit: float | None = None,
) -> None:
"""Log a scalar test result with pass/fail limits.
Results are written immediately to the repository (not buffered).
"""
self.repository.save_result(
run_id=self.run_id,
parameter=parameter,
value=value,
unit=unit,
lower_limit=lower_limit,
upper_limit=upper_limit,
)
def log_event(self, message: str, level: str = "INFO") -> None:
"""Log a test event or message.
Events are currently logged to console. Future versions may
persist events to the repository.
"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] {level:7s} {message}")
def flush(self) -> None:
"""Flush buffered measurements to repository.
Writes all buffered measurements to the repository in a single
batch operation, then clears the buffer.
"""
if self.measurement_buffer:
self.repository.save_measurements(
run_id=self.run_id,
measurements=self.measurement_buffer,
)
self.measurement_buffer.clear()
def __del__(self) -> None:
"""Ensure buffered data is flushed on logger destruction."""
try:
self.flush()
except Exception:
# Ignore errors during cleanup
pass