Compare commits
16 Commits
v0.1.0-bet
...
v0.1.0-bet
| Author | SHA1 | Date | |
|---|---|---|---|
| 2e62a10550 | |||
| d07e6e3f1a | |||
| 96eb83cec4 | |||
| 027fd71505 | |||
| 3310e86fae | |||
| e42de212f2 | |||
| ee8d148eb7 | |||
| e379b7e432 | |||
| eaa1843ee1 | |||
| 7429f6433c | |||
| 7cfd36f02b | |||
| f5600efd76 | |||
| 0615eb7e07 | |||
| b981182b71 | |||
| 8c0d68e722 | |||
| 4e14222522 |
25
CHANGELOG.md
25
CHANGELOG.md
@@ -7,6 +7,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.1.0-beta.2] - 2025-12-03
|
||||
|
||||
### Added
|
||||
- Test Executive Framework (Sprint 14)
|
||||
- TestContext dataclass providing runtime context for tests
|
||||
- ITest abstract base class defining test interface
|
||||
- TestLogger for recording measurements, results, and events
|
||||
- LimitChecker for evaluating pass/fail against specification limits
|
||||
- TestRunner for orchestrating test execution
|
||||
- SQLite-based TestRepository for persisting test data
|
||||
- Parquet measurement storage for efficient time-series data
|
||||
- DVT Test Implementation (Sprint 15)
|
||||
- BaseDVTTest providing common test utilities
|
||||
- TempCo characterisation test (temperature coefficient measurement)
|
||||
- Temperature sweep with automatic thermal settling
|
||||
- Linear regression TempCo calculation (ppm/°C)
|
||||
- Comprehensive integration tests for end-to-end validation
|
||||
|
||||
### Technical
|
||||
- Test framework supports data logging, limit evaluation, and result persistence
|
||||
- TempCo test demonstrates full end-to-end workflow: configure instruments → sweep temperature → measure → calculate → evaluate
|
||||
- All framework and test components fully type-checked and linted
|
||||
|
||||
## [0.1.0-beta.1] - 2025-12-02
|
||||
|
||||
### Added
|
||||
@@ -106,7 +129,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
| Version | Date | Milestone |
|
||||
|---------|------|-----------|
|
||||
| 0.1.0 | TBD | MVP Complete |
|
||||
| 0.1.0-beta.2 | TBD | First DVT test runs |
|
||||
| 0.1.0-beta.2 | 2025-12-03 | First DVT test runs |
|
||||
| 0.1.0-beta.1 | 2025-12-02 | HAL complete |
|
||||
| 0.1.0-alpha.3 | 2025-12-02 | Network ready |
|
||||
| 0.1.0-alpha.2 | 2025-12-02 | Visual demo |
|
||||
|
||||
149
config/default.yaml
Normal file
149
config/default.yaml
Normal file
@@ -0,0 +1,149 @@
|
||||
# py_dvt_ate Default Configuration
|
||||
# This file contains default settings for the DVT simulation platform.
|
||||
# Copy this file and modify values as needed for your environment.
|
||||
|
||||
# =============================================================================
|
||||
# Instrument Configuration
|
||||
# =============================================================================
|
||||
instruments:
|
||||
# Backend selection: "simulator" or "pyvisa"
|
||||
# - simulator: Use virtual instruments with physics simulation (for development)
|
||||
# - pyvisa: Connect to real instruments via PyVISA (for production testing)
|
||||
backend: simulator
|
||||
|
||||
# Simulator backend configuration
|
||||
# Used when backend=simulator. Virtual instruments are exposed as TCP servers.
|
||||
simulator:
|
||||
host: localhost
|
||||
thermal_chamber_port: 5001
|
||||
power_supply_port: 5002
|
||||
multimeter_port: 5003
|
||||
|
||||
# PyVISA backend configuration
|
||||
# Used when backend=pyvisa. Provide VISA resource strings for real instruments.
|
||||
# Example: "TCPIP::192.168.1.10::5001::SOCKET"
|
||||
pyvisa:
|
||||
thermal_chamber: null
|
||||
power_supply: null
|
||||
multimeter: null
|
||||
|
||||
# =============================================================================
|
||||
# Physics Simulation Parameters
|
||||
# =============================================================================
|
||||
physics:
|
||||
# Physics engine update rate (Hz)
|
||||
# Higher rates provide better accuracy but use more CPU.
|
||||
update_rate_hz: 100.0
|
||||
|
||||
# Thermal model parameters
|
||||
thermal:
|
||||
# Chamber thermal time constant (seconds)
|
||||
# Time for chamber temperature to reach 63% of final value
|
||||
chamber_time_constant_s: 30.0
|
||||
|
||||
# DUT case thermal time constant (seconds)
|
||||
# Time for case temperature to reach 63% of final value
|
||||
case_time_constant_s: 5.0
|
||||
|
||||
# Junction-to-case thermal resistance (°C/W)
|
||||
# How much the junction heats above case per watt dissipated
|
||||
theta_jc: 15.0
|
||||
|
||||
# Case-to-ambient thermal resistance (°C/W)
|
||||
# How much the case heats above ambient per watt dissipated
|
||||
theta_ca: 5.0
|
||||
|
||||
# Thermal chamber behaviour
|
||||
chamber:
|
||||
# Maximum temperature ramp rate (°C/min)
|
||||
# Real chambers have limited heating/cooling rates
|
||||
ramp_rate_c_per_min: 10.0
|
||||
|
||||
# Temperature stability window (°C)
|
||||
# Chamber is considered stable when within ±this value of setpoint
|
||||
stability_window_c: 0.5
|
||||
|
||||
# Stability duration requirement (seconds)
|
||||
# Chamber must remain in stability window for this duration
|
||||
stability_time_s: 30.0
|
||||
|
||||
# =============================================================================
|
||||
# DUT (Device Under Test) Configuration
|
||||
# =============================================================================
|
||||
dut:
|
||||
# DUT model type
|
||||
# Currently supported: "ldo"
|
||||
model: ldo
|
||||
|
||||
# DUT model parameters
|
||||
parameters:
|
||||
# Nominal output voltage at 25°C (V)
|
||||
nominal_output_voltage: 3.3
|
||||
|
||||
# Temperature coefficient (ppm/°C)
|
||||
# Voltage change per degree: ΔV = V₀ × tempco × ΔT / 1e6
|
||||
tempco_ppm_per_c: 50.0
|
||||
|
||||
# Quiescent current at 25°C (µA)
|
||||
quiescent_current_ua: 50.0
|
||||
|
||||
# Quiescent current temperature coefficient (per °C)
|
||||
# Iq change per degree: ΔIq = Iq₀ × tempco × ΔT
|
||||
quiescent_current_tempco: 0.003
|
||||
|
||||
# Dropout voltage (V)
|
||||
# Minimum Vin-Vout differential for regulation
|
||||
dropout_voltage: 0.3
|
||||
|
||||
# =============================================================================
|
||||
# Data Storage Configuration
|
||||
# =============================================================================
|
||||
data:
|
||||
# SQLite database path for test runs and results
|
||||
database_path: ./data/py_dvt_ate.db
|
||||
|
||||
# Directory for measurement data files (Parquet format)
|
||||
measurements_dir: ./data/measurements
|
||||
|
||||
# Directory for generated reports (PDF, HTML)
|
||||
reports_dir: ./data/reports
|
||||
|
||||
# =============================================================================
|
||||
# Logging Configuration
|
||||
# =============================================================================
|
||||
logging:
|
||||
# Logging level: DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||
level: INFO
|
||||
|
||||
# Log file path
|
||||
# Use null to disable file logging
|
||||
file: ./data/logs/py_dvt_ate.log
|
||||
|
||||
# Log message format
|
||||
# Uses Python logging format strings
|
||||
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
|
||||
# =============================================================================
|
||||
# Dashboard Configuration (Streamlit)
|
||||
# =============================================================================
|
||||
dashboard:
|
||||
# Enable/disable the Streamlit dashboard
|
||||
enabled: true
|
||||
|
||||
# Dashboard server port
|
||||
port: 8501
|
||||
|
||||
# =============================================================================
|
||||
# API Configuration (Phase 2)
|
||||
# =============================================================================
|
||||
api:
|
||||
# Enable/disable the REST API server
|
||||
# Currently not implemented (Phase 2 feature)
|
||||
enabled: false
|
||||
|
||||
# API server host
|
||||
# Use "0.0.0.0" to listen on all interfaces
|
||||
host: "0.0.0.0"
|
||||
|
||||
# API server port
|
||||
port: 8000
|
||||
@@ -37,6 +37,7 @@ dev = [
|
||||
"pytest-asyncio>=0.21",
|
||||
"ruff>=0.1",
|
||||
"mypy>=1.0",
|
||||
"types-PyYAML>=6.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""py_dvt_ate: Coupled Physics DVT Simulation Platform."""
|
||||
|
||||
__version__ = "0.1.0-beta.1"
|
||||
__version__ = "0.1.0-beta.2"
|
||||
|
||||
200
src/py_dvt_ate/app/config.py
Normal file
200
src/py_dvt_ate/app/config.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""Configuration models for py_dvt_ate.
|
||||
|
||||
This module defines Pydantic models for all configuration sections.
|
||||
Configuration can be loaded from YAML files and validated at runtime.
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
|
||||
import yaml
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SimulatorConfig(BaseModel):
|
||||
"""Configuration for simulator instrument backend."""
|
||||
|
||||
host: str = "localhost"
|
||||
thermal_chamber_port: int = 5001
|
||||
power_supply_port: int = 5002
|
||||
multimeter_port: int = 5003
|
||||
|
||||
|
||||
class PyVISAConfig(BaseModel):
|
||||
"""Configuration for PyVISA instrument backend."""
|
||||
|
||||
thermal_chamber: str | None = None
|
||||
power_supply: str | None = None
|
||||
multimeter: str | None = None
|
||||
|
||||
|
||||
class InstrumentsConfig(BaseModel):
|
||||
"""Instrument backend configuration."""
|
||||
|
||||
backend: Literal["simulator", "pyvisa"] = "simulator"
|
||||
simulator: SimulatorConfig = Field(default_factory=SimulatorConfig)
|
||||
pyvisa: PyVISAConfig = Field(default_factory=PyVISAConfig)
|
||||
|
||||
|
||||
class ThermalConfig(BaseModel):
|
||||
"""Thermal physics parameters."""
|
||||
|
||||
chamber_time_constant_s: float = 30.0
|
||||
case_time_constant_s: float = 5.0
|
||||
theta_jc: float = 15.0 # °C/W (junction to case)
|
||||
theta_ca: float = 5.0 # °C/W (case to ambient)
|
||||
|
||||
|
||||
class ChamberConfig(BaseModel):
|
||||
"""Thermal chamber behaviour parameters."""
|
||||
|
||||
ramp_rate_c_per_min: float = 10.0
|
||||
stability_window_c: float = 0.5
|
||||
stability_time_s: float = 30.0
|
||||
|
||||
|
||||
class PhysicsConfig(BaseModel):
|
||||
"""Physics simulation parameters."""
|
||||
|
||||
update_rate_hz: float = 100.0
|
||||
thermal: ThermalConfig = Field(default_factory=ThermalConfig)
|
||||
chamber: ChamberConfig = Field(default_factory=ChamberConfig)
|
||||
|
||||
|
||||
class DUTParameters(BaseModel):
|
||||
"""DUT model parameters."""
|
||||
|
||||
nominal_output_voltage: float = 3.3
|
||||
tempco_ppm_per_c: float = 50.0
|
||||
quiescent_current_ua: float = 50.0
|
||||
quiescent_current_tempco: float = 0.003
|
||||
dropout_voltage: float = 0.3
|
||||
|
||||
|
||||
class DUTConfig(BaseModel):
|
||||
"""DUT model configuration."""
|
||||
|
||||
model: str = "ldo"
|
||||
parameters: DUTParameters = Field(default_factory=DUTParameters)
|
||||
|
||||
|
||||
class DataConfig(BaseModel):
|
||||
"""Data storage paths."""
|
||||
|
||||
database_path: str = "./data/py_dvt_ate.db"
|
||||
measurements_dir: str = "./data/measurements"
|
||||
reports_dir: str = "./data/reports"
|
||||
|
||||
|
||||
class LoggingConfig(BaseModel):
|
||||
"""Logging configuration."""
|
||||
|
||||
level: str = "INFO"
|
||||
file: str = "./data/logs/py_dvt_ate.log"
|
||||
format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
|
||||
|
||||
class DashboardConfig(BaseModel):
|
||||
"""Dashboard (Streamlit) configuration."""
|
||||
|
||||
enabled: bool = True
|
||||
port: int = 8501
|
||||
|
||||
|
||||
class APIConfig(BaseModel):
|
||||
"""API server configuration (Phase 2)."""
|
||||
|
||||
enabled: bool = False
|
||||
host: str = "0.0.0.0"
|
||||
port: int = 8000
|
||||
|
||||
|
||||
class AppConfig(BaseModel):
|
||||
"""Root configuration model."""
|
||||
|
||||
instruments: InstrumentsConfig = Field(default_factory=InstrumentsConfig)
|
||||
physics: PhysicsConfig = Field(default_factory=PhysicsConfig)
|
||||
dut: DUTConfig = Field(default_factory=DUTConfig)
|
||||
data: DataConfig = Field(default_factory=DataConfig)
|
||||
logging: LoggingConfig = Field(default_factory=LoggingConfig)
|
||||
dashboard: DashboardConfig = Field(default_factory=DashboardConfig)
|
||||
api: APIConfig = Field(default_factory=APIConfig)
|
||||
|
||||
|
||||
def _apply_env_overrides(config_dict: dict[str, Any]) -> None:
|
||||
"""Apply environment variable overrides to config dictionary.
|
||||
|
||||
Environment variables follow the pattern: PYDVTATE__{SECTION}__{KEY}
|
||||
For nested keys, use double underscores: PYDVTATE__{SECTION}__{SUBSECTION}__{KEY}
|
||||
|
||||
Examples:
|
||||
PYDVTATE__INSTRUMENTS__BACKEND=pyvisa
|
||||
PYDVTATE__PHYSICS__UPDATE_RATE_HZ=50.0
|
||||
PYDVTATE__SIMULATOR__HOST=192.168.1.100
|
||||
"""
|
||||
prefix = "PYDVTATE__"
|
||||
|
||||
for env_key, env_value in os.environ.items():
|
||||
if not env_key.startswith(prefix):
|
||||
continue
|
||||
|
||||
# Remove prefix and split into parts
|
||||
key_parts = env_key[len(prefix) :].lower().split("__")
|
||||
|
||||
# Navigate/create nested structure
|
||||
current = config_dict
|
||||
for part in key_parts[:-1]:
|
||||
if part not in current:
|
||||
current[part] = {}
|
||||
current = current[part]
|
||||
|
||||
# Set the final value
|
||||
final_key = key_parts[-1]
|
||||
# Try to parse as YAML to handle types (int, float, bool, etc.)
|
||||
try:
|
||||
current[final_key] = yaml.safe_load(env_value)
|
||||
except yaml.YAMLError:
|
||||
# If parsing fails, use as string
|
||||
current[final_key] = env_value
|
||||
|
||||
|
||||
def load_config(config_path: str | Path | None = None) -> AppConfig:
|
||||
"""Load configuration from YAML file with environment variable overrides.
|
||||
|
||||
Args:
|
||||
config_path: Path to YAML configuration file. If None, uses defaults only.
|
||||
|
||||
Returns:
|
||||
Validated AppConfig instance.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If config_path is provided but does not exist.
|
||||
yaml.YAMLError: If YAML file is malformed.
|
||||
pydantic.ValidationError: If configuration is invalid.
|
||||
|
||||
Environment Variables:
|
||||
Configuration can be overridden using environment variables with the
|
||||
pattern PYDVTATE__{SECTION}__{KEY}. For example:
|
||||
PYDVTATE__INSTRUMENTS__BACKEND=pyvisa
|
||||
PYDVTATE__PHYSICS__UPDATE_RATE_HZ=50.0
|
||||
"""
|
||||
# Start with empty dict (will use Pydantic defaults)
|
||||
config_dict: dict[str, Any] = {}
|
||||
|
||||
# Load from YAML file if provided
|
||||
if config_path is not None:
|
||||
path = Path(config_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Configuration file not found: {config_path}")
|
||||
|
||||
with path.open("r") as f:
|
||||
loaded = yaml.safe_load(f)
|
||||
if loaded is not None:
|
||||
config_dict = loaded
|
||||
|
||||
# Apply environment variable overrides
|
||||
_apply_env_overrides(config_dict)
|
||||
|
||||
# Validate and return
|
||||
return AppConfig(**config_dict)
|
||||
14
src/py_dvt_ate/data/__init__.py
Normal file
14
src/py_dvt_ate/data/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""Data persistence layer.
|
||||
|
||||
Provides storage for test runs, results, and measurements using
|
||||
SQLite for metadata and Parquet for time-series data.
|
||||
"""
|
||||
|
||||
from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus
|
||||
|
||||
__all__ = [
|
||||
"Measurement",
|
||||
"TestResult",
|
||||
"TestRun",
|
||||
"TestStatus",
|
||||
]
|
||||
83
src/py_dvt_ate/data/models.py
Normal file
83
src/py_dvt_ate/data/models.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""Data models for test persistence.
|
||||
|
||||
This module defines dataclasses representing test runs, results, and measurements.
|
||||
These models map to SQLite tables (for metadata) and Parquet files (for time-series).
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class TestStatus(Enum):
|
||||
"""Test run status."""
|
||||
|
||||
PENDING = "pending"
|
||||
RUNNING = "running"
|
||||
PASSED = "passed"
|
||||
FAILED = "failed"
|
||||
ERROR = "error"
|
||||
SKIPPED = "skipped"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestRun:
|
||||
"""Test run metadata.
|
||||
|
||||
Maps to the test_runs SQLite table.
|
||||
"""
|
||||
|
||||
id: str # UUID
|
||||
test_name: str
|
||||
started_at: datetime
|
||||
status: TestStatus
|
||||
config_json: str # JSON string of test configuration
|
||||
description: str | None = None
|
||||
completed_at: datetime | None = None
|
||||
operator: str | None = None
|
||||
notes: str | None = None
|
||||
created_at: datetime = field(default_factory=datetime.now)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TestResult:
|
||||
"""Immutable test result with limits.
|
||||
|
||||
Maps to the test_results SQLite table.
|
||||
Represents a single scalar measurement with pass/fail limits.
|
||||
"""
|
||||
|
||||
id: str # UUID
|
||||
test_run_id: str # Foreign key to test_runs.id
|
||||
parameter: str
|
||||
value: float
|
||||
unit: str
|
||||
measured_at: datetime
|
||||
lower_limit: float | None = None
|
||||
upper_limit: float | None = None
|
||||
|
||||
@property
|
||||
def passed(self) -> bool | None:
|
||||
"""Evaluate pass/fail. None if no limits defined."""
|
||||
if self.lower_limit is None and self.upper_limit is None:
|
||||
return None
|
||||
lower_ok = self.lower_limit is None or self.value >= self.lower_limit
|
||||
upper_ok = self.upper_limit is None or self.value <= self.upper_limit
|
||||
return lower_ok and upper_ok
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Measurement:
|
||||
"""Immutable measurement record for time-series data.
|
||||
|
||||
Maps to Parquet files for efficient storage and analysis.
|
||||
Includes measurement conditions (temperature, voltage, current) at time of measurement.
|
||||
"""
|
||||
|
||||
timestamp: float # Seconds since epoch (high precision)
|
||||
parameter: str
|
||||
value: float
|
||||
unit: str
|
||||
temperature: float = 0.0 # Chamber temperature at measurement
|
||||
input_voltage: float = 0.0 # DUT input voltage at measurement
|
||||
load_current: float = 0.0 # DUT load current at measurement
|
||||
359
src/py_dvt_ate/data/repository.py
Normal file
359
src/py_dvt_ate/data/repository.py
Normal file
@@ -0,0 +1,359 @@
|
||||
"""Data repository implementation using SQLite and Parquet.
|
||||
|
||||
This module provides SQLite-based storage for test run metadata and results.
|
||||
Time-series measurements are stored separately in Parquet files.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from py_dvt_ate.data.models import Measurement, TestResult, TestRun, TestStatus
|
||||
|
||||
|
||||
class ITestRepository(ABC):
|
||||
"""Repository interface for test data."""
|
||||
|
||||
@abstractmethod
|
||||
def create_run(
|
||||
self,
|
||||
test_name: str,
|
||||
config: dict[str, Any],
|
||||
operator: str | None = None,
|
||||
description: str | None = None,
|
||||
) -> UUID:
|
||||
"""Create a new test run and return its ID."""
|
||||
|
||||
@abstractmethod
|
||||
def update_run_status(self, run_id: UUID, status: TestStatus) -> None:
|
||||
"""Update the status of a test run."""
|
||||
|
||||
@abstractmethod
|
||||
def complete_run(self, run_id: UUID, status: TestStatus) -> None:
|
||||
"""Mark a test run as complete with final status."""
|
||||
|
||||
@abstractmethod
|
||||
def save_result(
|
||||
self,
|
||||
run_id: UUID,
|
||||
parameter: str,
|
||||
value: float,
|
||||
unit: str,
|
||||
lower_limit: float | None = None,
|
||||
upper_limit: float | None = None,
|
||||
) -> None:
|
||||
"""Save a scalar test result."""
|
||||
|
||||
@abstractmethod
|
||||
def save_measurements(
|
||||
self,
|
||||
run_id: UUID,
|
||||
measurements: list[Measurement],
|
||||
) -> None:
|
||||
"""Save time-series measurements (implemented in Parquet extension)."""
|
||||
|
||||
@abstractmethod
|
||||
def get_run(self, run_id: UUID) -> TestRun:
|
||||
"""Retrieve test run metadata by ID."""
|
||||
|
||||
@abstractmethod
|
||||
def get_results(self, run_id: UUID) -> list[TestResult]:
|
||||
"""Retrieve all test results for a run."""
|
||||
|
||||
@abstractmethod
|
||||
def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
|
||||
"""Retrieve measurements as pandas DataFrame."""
|
||||
|
||||
|
||||
class SQLiteRepository(ITestRepository):
|
||||
"""SQLite-based repository for test data.
|
||||
|
||||
Stores test run metadata and scalar results in SQLite.
|
||||
Time-series measurements are stored in Parquet files.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: str | Path, measurements_dir: str | Path | None = None):
|
||||
"""Initialise repository with database and measurements paths.
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database file
|
||||
measurements_dir: Directory for Parquet measurement files
|
||||
(defaults to db_path parent / "measurements")
|
||||
"""
|
||||
self.db_path = Path(db_path)
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if measurements_dir is None:
|
||||
self.measurements_dir = self.db_path.parent / "measurements"
|
||||
else:
|
||||
self.measurements_dir = Path(measurements_dir)
|
||||
|
||||
self.measurements_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._init_database()
|
||||
|
||||
def _init_database(self) -> None:
|
||||
"""Create database tables if they don't exist."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS test_runs (
|
||||
id TEXT PRIMARY KEY,
|
||||
test_name TEXT NOT NULL,
|
||||
description TEXT,
|
||||
started_at TEXT NOT NULL,
|
||||
completed_at TEXT,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
config_json TEXT NOT NULL,
|
||||
operator TEXT,
|
||||
notes TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS test_results (
|
||||
id TEXT PRIMARY KEY,
|
||||
test_run_id TEXT NOT NULL,
|
||||
parameter TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
unit TEXT,
|
||||
lower_limit REAL,
|
||||
upper_limit REAL,
|
||||
passed INTEGER NOT NULL,
|
||||
measured_at TEXT NOT NULL,
|
||||
FOREIGN KEY (test_run_id) REFERENCES test_runs(id)
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_test_runs_status ON test_runs(status)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_test_runs_name ON test_runs(test_name)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_test_results_run ON test_results(test_run_id)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_test_results_param ON test_results(parameter)"
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def create_run(
|
||||
self,
|
||||
test_name: str,
|
||||
config: dict[str, Any],
|
||||
operator: str | None = None,
|
||||
description: str | None = None,
|
||||
) -> UUID:
|
||||
"""Create a new test run and return its ID."""
|
||||
run_id = uuid4()
|
||||
started_at = datetime.now()
|
||||
config_json = json.dumps(config)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO test_runs (
|
||||
id, test_name, description, started_at, status,
|
||||
config_json, operator, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(run_id),
|
||||
test_name,
|
||||
description,
|
||||
started_at.isoformat(),
|
||||
TestStatus.PENDING.value,
|
||||
config_json,
|
||||
operator,
|
||||
datetime.now().isoformat(),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
return run_id
|
||||
|
||||
def update_run_status(self, run_id: UUID, status: TestStatus) -> None:
|
||||
"""Update the status of a test run."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"UPDATE test_runs SET status = ? WHERE id = ?",
|
||||
(status.value, str(run_id)),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def complete_run(self, run_id: UUID, status: TestStatus) -> None:
|
||||
"""Mark a test run as complete with final status."""
|
||||
completed_at = datetime.now()
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE test_runs
|
||||
SET status = ?, completed_at = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(status.value, completed_at.isoformat(), str(run_id)),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def save_result(
|
||||
self,
|
||||
run_id: UUID,
|
||||
parameter: str,
|
||||
value: float,
|
||||
unit: str,
|
||||
lower_limit: float | None = None,
|
||||
upper_limit: float | None = None,
|
||||
) -> None:
|
||||
"""Save a scalar test result."""
|
||||
result_id = uuid4()
|
||||
measured_at = datetime.now()
|
||||
|
||||
# Calculate pass/fail
|
||||
passed = 1 # Default to pass if no limits
|
||||
if lower_limit is not None or upper_limit is not None:
|
||||
lower_ok = lower_limit is None or value >= lower_limit
|
||||
upper_ok = upper_limit is None or value <= upper_limit
|
||||
passed = 1 if (lower_ok and upper_ok) else 0
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO test_results (
|
||||
id, test_run_id, parameter, value, unit,
|
||||
lower_limit, upper_limit, passed, measured_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(result_id),
|
||||
str(run_id),
|
||||
parameter,
|
||||
value,
|
||||
unit,
|
||||
lower_limit,
|
||||
upper_limit,
|
||||
passed,
|
||||
measured_at.isoformat(),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def save_measurements(
|
||||
self,
|
||||
run_id: UUID,
|
||||
measurements: list[Measurement],
|
||||
) -> None:
|
||||
"""Save time-series measurements to Parquet file.
|
||||
|
||||
Measurements are stored in Parquet format for efficient time-series storage.
|
||||
File path: {measurements_dir}/run_{run_id}/measurements.parquet
|
||||
"""
|
||||
if not measurements:
|
||||
return
|
||||
|
||||
# Create run-specific directory
|
||||
run_dir = self.measurements_dir / f"run_{run_id}"
|
||||
run_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Convert measurements to DataFrame
|
||||
data = {
|
||||
"timestamp": [m.timestamp for m in measurements],
|
||||
"parameter": [m.parameter for m in measurements],
|
||||
"value": [m.value for m in measurements],
|
||||
"unit": [m.unit for m in measurements],
|
||||
"temperature": [m.temperature for m in measurements],
|
||||
"input_voltage": [m.input_voltage for m in measurements],
|
||||
"load_current": [m.load_current for m in measurements],
|
||||
}
|
||||
df = pd.DataFrame(data)
|
||||
|
||||
# Save to Parquet (append mode if file exists)
|
||||
parquet_path = run_dir / "measurements.parquet"
|
||||
if parquet_path.exists():
|
||||
# Read existing data and append
|
||||
existing_df = pd.read_parquet(parquet_path)
|
||||
df = pd.concat([existing_df, df], ignore_index=True)
|
||||
|
||||
df.to_parquet(parquet_path, index=False, engine="pyarrow")
|
||||
|
||||
def get_run(self, run_id: UUID) -> TestRun:
|
||||
"""Retrieve test run metadata by ID."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"SELECT * FROM test_runs WHERE id = ?",
|
||||
(str(run_id),),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
msg = f"Test run {run_id} not found"
|
||||
raise ValueError(msg)
|
||||
|
||||
return TestRun(
|
||||
id=row["id"],
|
||||
test_name=row["test_name"],
|
||||
description=row["description"],
|
||||
started_at=datetime.fromisoformat(row["started_at"]),
|
||||
completed_at=(
|
||||
datetime.fromisoformat(row["completed_at"])
|
||||
if row["completed_at"]
|
||||
else None
|
||||
),
|
||||
status=TestStatus(row["status"]),
|
||||
config_json=row["config_json"],
|
||||
operator=row["operator"],
|
||||
notes=row["notes"],
|
||||
created_at=datetime.fromisoformat(row["created_at"]),
|
||||
)
|
||||
|
||||
def get_results(self, run_id: UUID) -> list[TestResult]:
|
||||
"""Retrieve all test results for a run."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"SELECT * FROM test_results WHERE test_run_id = ?",
|
||||
(str(run_id),),
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
return [
|
||||
TestResult(
|
||||
id=row["id"],
|
||||
test_run_id=row["test_run_id"],
|
||||
parameter=row["parameter"],
|
||||
value=row["value"],
|
||||
unit=row["unit"],
|
||||
lower_limit=row["lower_limit"],
|
||||
upper_limit=row["upper_limit"],
|
||||
measured_at=datetime.fromisoformat(row["measured_at"]),
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def get_measurements_dataframe(self, run_id: UUID) -> pd.DataFrame | None:
|
||||
"""Retrieve measurements as pandas DataFrame from Parquet file.
|
||||
|
||||
Args:
|
||||
run_id: Test run ID
|
||||
|
||||
Returns:
|
||||
DataFrame with measurement data, or None if no measurements exist
|
||||
"""
|
||||
parquet_path = self.measurements_dir / f"run_{run_id}" / "measurements.parquet"
|
||||
|
||||
if not parquet_path.exists():
|
||||
return None
|
||||
|
||||
return pd.read_parquet(parquet_path)
|
||||
@@ -3,3 +3,20 @@
|
||||
Provides test sequencing, measurement logging, limit checking,
|
||||
and runtime context management for DVT characterisation tests.
|
||||
"""
|
||||
|
||||
from py_dvt_ate.framework.context import ITest, TestContext
|
||||
from py_dvt_ate.framework.limits import Limit, LimitSet, check_value, evaluate_results
|
||||
from py_dvt_ate.framework.logger import ITestLogger, TestLogger
|
||||
from py_dvt_ate.framework.runner import TestRunner
|
||||
|
||||
__all__ = [
|
||||
"ITest",
|
||||
"ITestLogger",
|
||||
"Limit",
|
||||
"LimitSet",
|
||||
"TestContext",
|
||||
"TestLogger",
|
||||
"TestRunner",
|
||||
"check_value",
|
||||
"evaluate_results",
|
||||
]
|
||||
|
||||
111
src/py_dvt_ate/framework/context.py
Normal file
111
src/py_dvt_ate/framework/context.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""Test framework context and interface definitions.
|
||||
|
||||
This module defines the core abstractions for the test executive framework:
|
||||
- TestContext: Runtime context passed to tests during execution
|
||||
- ITest: Abstract base class that all DVT tests must implement
|
||||
|
||||
The test framework orchestrates test execution, measurement logging, and
|
||||
result evaluation against limits.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from uuid import UUID
|
||||
|
||||
from py_dvt_ate.data.models import TestStatus
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# Avoid circular imports while maintaining type checking
|
||||
from py_dvt_ate.framework.logger import ITestLogger
|
||||
from py_dvt_ate.instruments.factory import InstrumentSet
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestContext:
|
||||
"""Runtime context for test execution.
|
||||
|
||||
Provides access to instruments, logging, and configuration during test
|
||||
execution. Passed to each test's execute() method.
|
||||
|
||||
Attributes:
|
||||
run_id: Unique identifier for this test run (UUID).
|
||||
instruments: Hardware abstraction layer providing access to all instruments.
|
||||
logger: Test logger for recording measurements and events.
|
||||
config: Test-specific configuration dictionary.
|
||||
"""
|
||||
|
||||
run_id: UUID
|
||||
instruments: "InstrumentSet"
|
||||
logger: "ITestLogger"
|
||||
config: dict[str, Any]
|
||||
|
||||
|
||||
class ITest(ABC):
|
||||
"""Abstract base class for DVT test implementations.
|
||||
|
||||
All characterisation tests must inherit from this class and implement
|
||||
the required properties and methods. The test runner uses these to
|
||||
discover, describe, and execute tests.
|
||||
|
||||
Example:
|
||||
class TempCoTest(ITest):
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "tempco"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return "Output voltage temperature coefficient"
|
||||
|
||||
def execute(self, context: TestContext) -> TestStatus:
|
||||
# Test implementation...
|
||||
return TestStatus.PASSED
|
||||
"""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def name(self) -> str:
|
||||
"""Return the unique test identifier.
|
||||
|
||||
Used for test discovery and selection. Should be lowercase,
|
||||
alphanumeric with underscores (e.g., "tempco", "load_regulation").
|
||||
|
||||
Returns:
|
||||
Unique test name string.
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def description(self) -> str:
|
||||
"""Return a human-readable test description.
|
||||
|
||||
Describes what the test measures or characterises. Displayed in
|
||||
reports and user interfaces.
|
||||
|
||||
Returns:
|
||||
Brief description of the test purpose.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def execute(self, context: TestContext) -> TestStatus:
|
||||
"""Execute the test with the given context.
|
||||
|
||||
Contains the test logic: configure instruments, take measurements,
|
||||
log results, and evaluate pass/fail. The test should use the
|
||||
context.logger to record measurements and context.instruments to
|
||||
control equipment.
|
||||
|
||||
Args:
|
||||
context: Runtime context with instruments, logger, and config.
|
||||
|
||||
Returns:
|
||||
Final test status (PASSED, FAILED, ERROR, etc.).
|
||||
|
||||
Raises:
|
||||
Exception: If a critical error occurs during test execution.
|
||||
The test runner will catch this and mark the test as ERROR.
|
||||
"""
|
||||
pass
|
||||
238
src/py_dvt_ate/framework/limits.py
Normal file
238
src/py_dvt_ate/framework/limits.py
Normal file
@@ -0,0 +1,238 @@
|
||||
"""Limit checking utilities for test result evaluation.
|
||||
|
||||
This module provides utilities for evaluating measurements against specification
|
||||
limits and determining pass/fail status. Used by tests to check if results meet
|
||||
requirements and by the test runner to determine overall test status.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from py_dvt_ate.data.models import TestResult, TestStatus
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Limit:
|
||||
"""Specification limit for a parameter.
|
||||
|
||||
Represents a single limit specification with optional lower and upper bounds.
|
||||
Used to define test specifications and evaluate pass/fail.
|
||||
|
||||
Attributes:
|
||||
parameter: Parameter name this limit applies to.
|
||||
lower: Optional lower limit (inclusive). None means no lower limit.
|
||||
upper: Optional upper limit (inclusive). None means no upper limit.
|
||||
unit: Unit of measurement for the limits.
|
||||
|
||||
Example:
|
||||
temp_co_limit = Limit("temp_co", lower=-50.0, upper=50.0, unit="ppm/°C")
|
||||
"""
|
||||
|
||||
parameter: str
|
||||
lower: float | None = None
|
||||
upper: float | None = None
|
||||
unit: str = ""
|
||||
|
||||
def check(self, value: float) -> bool | None:
|
||||
"""Check if a value is within this limit.
|
||||
|
||||
Args:
|
||||
value: Value to check against limits.
|
||||
|
||||
Returns:
|
||||
True if value is within limits, False if outside limits.
|
||||
None if no limits are defined (informational parameter).
|
||||
|
||||
Example:
|
||||
limit = Limit("v_out", lower=3.25, upper=3.35, unit="V")
|
||||
limit.check(3.30) # Returns True
|
||||
limit.check(3.40) # Returns False
|
||||
"""
|
||||
if self.lower is None and self.upper is None:
|
||||
return None
|
||||
|
||||
lower_ok = self.lower is None or value >= self.lower
|
||||
upper_ok = self.upper is None or value <= self.upper
|
||||
return lower_ok and upper_ok
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LimitSet:
|
||||
"""Collection of limits for a test.
|
||||
|
||||
Groups multiple parameter limits together as a test specification.
|
||||
Can be loaded from configuration or defined programmatically.
|
||||
|
||||
Attributes:
|
||||
name: Name of this limit set (e.g., "nominal", "extended").
|
||||
limits: Dictionary mapping parameter names to Limit objects.
|
||||
|
||||
Example:
|
||||
limits = LimitSet(
|
||||
name="nominal",
|
||||
limits={
|
||||
"temp_co": Limit("temp_co", -50.0, 50.0, "ppm/°C"),
|
||||
"v_out": Limit("v_out", 3.25, 3.35, "V"),
|
||||
}
|
||||
)
|
||||
"""
|
||||
|
||||
name: str
|
||||
limits: dict[str, Limit]
|
||||
|
||||
def get_limit(self, parameter: str) -> Limit | None:
|
||||
"""Get the limit for a specific parameter.
|
||||
|
||||
Args:
|
||||
parameter: Parameter name to look up.
|
||||
|
||||
Returns:
|
||||
Limit object if found, None if parameter has no limit defined.
|
||||
"""
|
||||
return self.limits.get(parameter)
|
||||
|
||||
def check(self, parameter: str, value: float) -> bool | None:
|
||||
"""Check if a value is within limits for a parameter.
|
||||
|
||||
Args:
|
||||
parameter: Parameter name.
|
||||
value: Value to check.
|
||||
|
||||
Returns:
|
||||
True if within limits, False if outside limits.
|
||||
None if parameter has no limit defined.
|
||||
"""
|
||||
limit = self.get_limit(parameter)
|
||||
if limit is None:
|
||||
return None
|
||||
return limit.check(value)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, name: str, limits_dict: dict[str, Any]) -> "LimitSet":
|
||||
"""Create a LimitSet from a dictionary.
|
||||
|
||||
Useful for loading limit sets from YAML configuration files.
|
||||
|
||||
Args:
|
||||
name: Name for this limit set.
|
||||
limits_dict: Dictionary with parameter names as keys and limit
|
||||
specifications as values. Each limit spec should have:
|
||||
- "lower": Optional lower limit
|
||||
- "upper": Optional upper limit
|
||||
- "unit": Unit of measurement
|
||||
|
||||
Returns:
|
||||
LimitSet instance.
|
||||
|
||||
Example:
|
||||
config = {
|
||||
"temp_co": {"lower": -50.0, "upper": 50.0, "unit": "ppm/°C"},
|
||||
"v_out": {"lower": 3.25, "upper": 3.35, "unit": "V"},
|
||||
}
|
||||
limits = LimitSet.from_dict("nominal", config)
|
||||
"""
|
||||
limits = {}
|
||||
for param, spec in limits_dict.items():
|
||||
limits[param] = Limit(
|
||||
parameter=param,
|
||||
lower=spec.get("lower"),
|
||||
upper=spec.get("upper"),
|
||||
unit=spec.get("unit", ""),
|
||||
)
|
||||
return cls(name=name, limits=limits)
|
||||
|
||||
|
||||
def check_value(
|
||||
value: float,
|
||||
lower: float | None = None,
|
||||
upper: float | None = None,
|
||||
) -> bool | None:
|
||||
"""Check if a value is within specified limits.
|
||||
|
||||
Utility function for quick limit checking without creating Limit objects.
|
||||
|
||||
Args:
|
||||
value: Value to check.
|
||||
lower: Optional lower limit (inclusive).
|
||||
upper: Optional upper limit (inclusive).
|
||||
|
||||
Returns:
|
||||
True if value is within limits, False if outside limits.
|
||||
None if no limits are specified.
|
||||
|
||||
Example:
|
||||
check_value(3.30, lower=3.25, upper=3.35) # Returns True
|
||||
check_value(3.40, lower=3.25, upper=3.35) # Returns False
|
||||
check_value(3.30) # Returns None (no limits)
|
||||
"""
|
||||
if lower is None and upper is None:
|
||||
return None
|
||||
|
||||
lower_ok = lower is None or value >= lower
|
||||
upper_ok = upper is None or value <= upper
|
||||
return lower_ok and upper_ok
|
||||
|
||||
|
||||
def evaluate_results(results: list[TestResult]) -> TestStatus:
|
||||
"""Evaluate a list of test results to determine overall status.
|
||||
|
||||
Aggregates multiple test results into a single pass/fail determination.
|
||||
If any result fails its limits, the overall status is FAILED.
|
||||
If all results pass (or have no limits), the overall status is PASSED.
|
||||
|
||||
Args:
|
||||
results: List of TestResult objects to evaluate.
|
||||
|
||||
Returns:
|
||||
TestStatus.PASSED if all results pass their limits.
|
||||
TestStatus.FAILED if any result fails its limits.
|
||||
TestStatus.PASSED if no results have limits defined (informational only).
|
||||
|
||||
Example:
|
||||
results = [
|
||||
TestResult(..., value=25.0, lower_limit=-50.0, upper_limit=50.0),
|
||||
TestResult(..., value=3.30, lower_limit=3.25, upper_limit=3.35),
|
||||
]
|
||||
status = evaluate_results(results) # Returns TestStatus.PASSED
|
||||
"""
|
||||
if not results:
|
||||
return TestStatus.PASSED
|
||||
|
||||
# Check if any result failed
|
||||
for result in results:
|
||||
if result.passed is False:
|
||||
return TestStatus.FAILED
|
||||
|
||||
# All results passed (or had no limits)
|
||||
return TestStatus.PASSED
|
||||
|
||||
|
||||
def format_limit_violation(result: TestResult) -> str:
|
||||
"""Format a limit violation message for a failed result.
|
||||
|
||||
Creates a human-readable message describing why a result failed.
|
||||
Useful for logging and reporting.
|
||||
|
||||
Args:
|
||||
result: TestResult that failed its limits.
|
||||
|
||||
Returns:
|
||||
Formatted violation message.
|
||||
|
||||
Example:
|
||||
result = TestResult(..., parameter="v_out", value=3.40,
|
||||
lower_limit=3.25, upper_limit=3.35, unit="V")
|
||||
message = format_limit_violation(result)
|
||||
# Returns: "v_out: 3.400 V [FAIL] (limits: 3.250 to 3.350 V)"
|
||||
"""
|
||||
status = "PASS" if result.passed else "FAIL"
|
||||
limits_str = ""
|
||||
|
||||
if result.lower_limit is not None and result.upper_limit is not None:
|
||||
limits_str = f" (limits: {result.lower_limit:.3f} to {result.upper_limit:.3f} {result.unit})"
|
||||
elif result.lower_limit is not None:
|
||||
limits_str = f" (minimum: {result.lower_limit:.3f} {result.unit})"
|
||||
elif result.upper_limit is not None:
|
||||
limits_str = f" (maximum: {result.upper_limit:.3f} {result.unit})"
|
||||
|
||||
return f"{result.parameter}: {result.value:.3f} {result.unit} [{status}]{limits_str}"
|
||||
222
src/py_dvt_ate/framework/logger.py
Normal file
222
src/py_dvt_ate/framework/logger.py
Normal file
@@ -0,0 +1,222 @@
|
||||
"""Test logger for recording measurements and events.
|
||||
|
||||
This module provides the logging infrastructure for DVT tests. The test logger
|
||||
records time-series measurements, scalar results with limits, and event messages
|
||||
during test execution.
|
||||
"""
|
||||
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from uuid import UUID
|
||||
|
||||
from py_dvt_ate.data.models import Measurement
|
||||
from py_dvt_ate.data.repository import ITestRepository
|
||||
|
||||
|
||||
class ITestLogger(ABC):
|
||||
"""Abstract interface for test data logging.
|
||||
|
||||
Provides methods for logging measurements, results, and events during
|
||||
test execution. Implementations are responsible for persisting this
|
||||
data to the appropriate storage backend.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def log_measurement(
|
||||
self,
|
||||
parameter: str,
|
||||
value: float,
|
||||
unit: str,
|
||||
conditions: dict[str, float] | None = None,
|
||||
) -> None:
|
||||
"""Log a time-series measurement with environmental conditions.
|
||||
|
||||
Used for logging raw measurements taken during the test. These are
|
||||
stored as time-series data for later analysis and plotting.
|
||||
|
||||
Args:
|
||||
parameter: Measurement parameter name (e.g., "v_out", "i_q").
|
||||
value: Measured value.
|
||||
unit: Unit of measurement (e.g., "V", "A", "°C").
|
||||
conditions: Optional environmental conditions at time of measurement:
|
||||
- "temperature": Chamber temperature (°C)
|
||||
- "input_voltage": DUT input voltage (V)
|
||||
- "load_current": DUT load current (A)
|
||||
|
||||
Example:
|
||||
logger.log_measurement(
|
||||
"v_out", 3.301, "V",
|
||||
conditions={"temperature": 25.0, "input_voltage": 5.0}
|
||||
)
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def log_result(
|
||||
self,
|
||||
parameter: str,
|
||||
value: float,
|
||||
unit: str,
|
||||
lower_limit: float | None = None,
|
||||
upper_limit: float | None = None,
|
||||
) -> None:
|
||||
"""Log a scalar test result with pass/fail limits.
|
||||
|
||||
Used for logging calculated or derived results that will be evaluated
|
||||
against specification limits. These appear in test reports and determine
|
||||
overall pass/fail status.
|
||||
|
||||
Args:
|
||||
parameter: Result parameter name (e.g., "temp_co", "load_reg").
|
||||
value: Calculated or measured value.
|
||||
unit: Unit of measurement (e.g., "ppm/°C", "%", "mV").
|
||||
lower_limit: Optional lower limit for pass/fail evaluation.
|
||||
upper_limit: Optional upper limit for pass/fail evaluation.
|
||||
|
||||
Example:
|
||||
logger.log_result(
|
||||
"temp_co", 23.5, "ppm/°C",
|
||||
lower_limit=-50.0, upper_limit=50.0
|
||||
)
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def log_event(self, message: str, level: str = "INFO") -> None:
|
||||
"""Log a test event or message.
|
||||
|
||||
Used for logging informational messages, warnings, and errors during
|
||||
test execution. Useful for debugging and understanding test flow.
|
||||
|
||||
Args:
|
||||
message: Event message text.
|
||||
level: Log level ("DEBUG", "INFO", "WARNING", "ERROR").
|
||||
|
||||
Example:
|
||||
logger.log_event("Waiting for thermal stability", level="INFO")
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def flush(self) -> None:
|
||||
"""Flush any buffered data to storage.
|
||||
|
||||
Forces any buffered measurements or results to be written to the
|
||||
underlying storage backend. Called automatically at end of test,
|
||||
but can be called manually for long-running tests.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class TestLogger(ITestLogger):
|
||||
"""Concrete test logger implementation using repository pattern.
|
||||
|
||||
Buffers measurements in memory and writes them in batches to a
|
||||
repository for efficiency. Results and events are written immediately.
|
||||
|
||||
Attributes:
|
||||
run_id: UUID of the test run this logger is associated with.
|
||||
repository: Data repository for persisting measurements and results.
|
||||
measurement_buffer: In-memory buffer of measurements awaiting write.
|
||||
buffer_size: Number of measurements to buffer before auto-flush.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
run_id: UUID,
|
||||
repository: ITestRepository,
|
||||
buffer_size: int = 100,
|
||||
):
|
||||
"""Initialise test logger.
|
||||
|
||||
Args:
|
||||
run_id: UUID of the test run to associate logs with.
|
||||
repository: Repository for persisting data.
|
||||
buffer_size: Number of measurements to buffer before auto-flush.
|
||||
Default 100 provides good balance of performance
|
||||
and memory usage.
|
||||
"""
|
||||
self.run_id = run_id
|
||||
self.repository = repository
|
||||
self.buffer_size = buffer_size
|
||||
self.measurement_buffer: list[Measurement] = []
|
||||
|
||||
def log_measurement(
|
||||
self,
|
||||
parameter: str,
|
||||
value: float,
|
||||
unit: str,
|
||||
conditions: dict[str, float] | None = None,
|
||||
) -> None:
|
||||
"""Log a time-series measurement with environmental conditions.
|
||||
|
||||
Measurements are buffered in memory and written to the repository
|
||||
in batches for efficiency.
|
||||
"""
|
||||
conditions = conditions or {}
|
||||
measurement = Measurement(
|
||||
timestamp=time.time(),
|
||||
parameter=parameter,
|
||||
value=value,
|
||||
unit=unit,
|
||||
temperature=conditions.get("temperature", 0.0),
|
||||
input_voltage=conditions.get("input_voltage", 0.0),
|
||||
load_current=conditions.get("load_current", 0.0),
|
||||
)
|
||||
self.measurement_buffer.append(measurement)
|
||||
|
||||
# Auto-flush when buffer is full
|
||||
if len(self.measurement_buffer) >= self.buffer_size:
|
||||
self.flush()
|
||||
|
||||
def log_result(
|
||||
self,
|
||||
parameter: str,
|
||||
value: float,
|
||||
unit: str,
|
||||
lower_limit: float | None = None,
|
||||
upper_limit: float | None = None,
|
||||
) -> None:
|
||||
"""Log a scalar test result with pass/fail limits.
|
||||
|
||||
Results are written immediately to the repository (not buffered).
|
||||
"""
|
||||
self.repository.save_result(
|
||||
run_id=self.run_id,
|
||||
parameter=parameter,
|
||||
value=value,
|
||||
unit=unit,
|
||||
lower_limit=lower_limit,
|
||||
upper_limit=upper_limit,
|
||||
)
|
||||
|
||||
def log_event(self, message: str, level: str = "INFO") -> None:
|
||||
"""Log a test event or message.
|
||||
|
||||
Events are currently logged to console. Future versions may
|
||||
persist events to the repository.
|
||||
"""
|
||||
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||||
print(f"[{timestamp}] {level:7s} {message}")
|
||||
|
||||
def flush(self) -> None:
|
||||
"""Flush buffered measurements to repository.
|
||||
|
||||
Writes all buffered measurements to the repository in a single
|
||||
batch operation, then clears the buffer.
|
||||
"""
|
||||
if self.measurement_buffer:
|
||||
self.repository.save_measurements(
|
||||
run_id=self.run_id,
|
||||
measurements=self.measurement_buffer,
|
||||
)
|
||||
self.measurement_buffer.clear()
|
||||
|
||||
def __del__(self) -> None:
|
||||
"""Ensure buffered data is flushed on logger destruction."""
|
||||
try:
|
||||
self.flush()
|
||||
except Exception:
|
||||
# Ignore errors during cleanup
|
||||
pass
|
||||
203
src/py_dvt_ate/framework/runner.py
Normal file
203
src/py_dvt_ate/framework/runner.py
Normal file
@@ -0,0 +1,203 @@
|
||||
"""Test runner for orchestrating DVT test execution.
|
||||
|
||||
This module provides the TestRunner class, which coordinates test execution,
|
||||
manages test lifecycle, and ensures proper logging and error handling.
|
||||
"""
|
||||
|
||||
import json
|
||||
import traceback
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from py_dvt_ate.data.models import TestStatus
|
||||
from py_dvt_ate.data.repository import ITestRepository
|
||||
from py_dvt_ate.framework.context import ITest, TestContext
|
||||
from py_dvt_ate.framework.limits import evaluate_results
|
||||
from py_dvt_ate.framework.logger import TestLogger
|
||||
from py_dvt_ate.instruments.factory import InstrumentSet
|
||||
|
||||
|
||||
class TestRunner:
|
||||
"""Orchestrates DVT test execution.
|
||||
|
||||
The test runner manages the complete test lifecycle:
|
||||
1. Creates a test run record in the repository
|
||||
2. Sets up logging and context
|
||||
3. Executes the test with proper error handling
|
||||
4. Evaluates results against limits
|
||||
5. Updates final status and flushes data
|
||||
|
||||
Attributes:
|
||||
repository: Data repository for persisting test results.
|
||||
|
||||
Example:
|
||||
runner = TestRunner(repository)
|
||||
instruments = factory.create(config)
|
||||
run_id = runner.run_test(
|
||||
test=TempCoTest(),
|
||||
instruments=instruments,
|
||||
config={"temp_points": [-40, 25, 85]},
|
||||
operator="alice@example.com"
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(self, repository: ITestRepository):
|
||||
"""Initialise test runner.
|
||||
|
||||
Args:
|
||||
repository: Repository for persisting test data.
|
||||
"""
|
||||
self.repository = repository
|
||||
|
||||
def run_test(
|
||||
self,
|
||||
test: ITest,
|
||||
instruments: InstrumentSet,
|
||||
config: dict[str, Any] | None = None,
|
||||
operator: str | None = None,
|
||||
description: str | None = None,
|
||||
) -> UUID:
|
||||
"""Run a DVT test with full lifecycle management.
|
||||
|
||||
Creates a test run, executes the test with proper error handling,
|
||||
evaluates results, and updates final status. All measurements and
|
||||
results are persisted to the repository.
|
||||
|
||||
Args:
|
||||
test: Test instance to execute (implements ITest).
|
||||
instruments: Instrument set for test to use.
|
||||
config: Optional test-specific configuration dictionary.
|
||||
operator: Optional operator identifier (e.g., email address).
|
||||
description: Optional human-readable test run description.
|
||||
|
||||
Returns:
|
||||
UUID of the test run. Can be used to retrieve results later.
|
||||
|
||||
Raises:
|
||||
Exception: Only if repository operations fail. Test execution
|
||||
errors are caught and recorded as ERROR status.
|
||||
|
||||
Example:
|
||||
run_id = runner.run_test(
|
||||
test=TempCoTest(),
|
||||
instruments=instruments,
|
||||
config={"temp_points": [-40, 25, 85]},
|
||||
operator="alice@example.com",
|
||||
description="Characterisation run #42"
|
||||
)
|
||||
print(f"Test run ID: {run_id}")
|
||||
"""
|
||||
config = config or {}
|
||||
|
||||
# Create test run record
|
||||
run_id = self.repository.create_run(
|
||||
test_name=test.name,
|
||||
config=config,
|
||||
operator=operator,
|
||||
description=description or test.description,
|
||||
)
|
||||
|
||||
# Create logger for this run
|
||||
logger = TestLogger(run_id=run_id, repository=self.repository)
|
||||
|
||||
# Create test context
|
||||
context = TestContext(
|
||||
run_id=run_id,
|
||||
instruments=instruments,
|
||||
logger=logger,
|
||||
config=config,
|
||||
)
|
||||
|
||||
# Update status to running
|
||||
self.repository.update_run_status(run_id, TestStatus.RUNNING)
|
||||
|
||||
# Execute test with error handling
|
||||
try:
|
||||
logger.log_event(f"Starting test: {test.name}", level="INFO")
|
||||
logger.log_event(f"Description: {test.description}", level="INFO")
|
||||
|
||||
# Log configuration
|
||||
if config:
|
||||
config_str = json.dumps(config, indent=2)
|
||||
logger.log_event(f"Configuration:\n{config_str}", level="DEBUG")
|
||||
|
||||
# Execute the test
|
||||
status = test.execute(context)
|
||||
|
||||
# Flush any buffered measurements
|
||||
logger.flush()
|
||||
|
||||
# Evaluate results if test didn't explicitly set status
|
||||
if status == TestStatus.RUNNING:
|
||||
results = self.repository.get_results(run_id)
|
||||
status = evaluate_results(results)
|
||||
logger.log_event(
|
||||
f"Test completed. Evaluated {len(results)} results: {status.value}",
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
# Update final status
|
||||
self.repository.complete_run(run_id, status)
|
||||
logger.log_event(f"Test finished with status: {status.value}", level="INFO")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
# User interrupted - mark as error but don't swallow interrupt
|
||||
logger.log_event("Test interrupted by user", level="WARNING")
|
||||
logger.flush()
|
||||
self.repository.complete_run(run_id, TestStatus.ERROR)
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
# Test execution error - log and mark as ERROR
|
||||
error_msg = f"Test execution failed: {e}"
|
||||
logger.log_event(error_msg, level="ERROR")
|
||||
logger.log_event(traceback.format_exc(), level="DEBUG")
|
||||
logger.flush()
|
||||
self.repository.complete_run(run_id, TestStatus.ERROR)
|
||||
logger.log_event("Test finished with status: ERROR", level="INFO")
|
||||
|
||||
return run_id
|
||||
|
||||
def run_tests(
|
||||
self,
|
||||
tests: list[ITest],
|
||||
instruments: InstrumentSet,
|
||||
config: dict[str, Any] | None = None,
|
||||
operator: str | None = None,
|
||||
) -> list[UUID]:
|
||||
"""Run multiple tests sequentially.
|
||||
|
||||
Convenience method for running a suite of tests. Each test is run
|
||||
independently with its own test run record. If one test fails, the
|
||||
remaining tests still execute.
|
||||
|
||||
Args:
|
||||
tests: List of test instances to execute.
|
||||
instruments: Instrument set shared by all tests.
|
||||
config: Optional configuration applied to all tests.
|
||||
operator: Optional operator identifier.
|
||||
|
||||
Returns:
|
||||
List of test run UUIDs in execution order.
|
||||
|
||||
Example:
|
||||
run_ids = runner.run_tests(
|
||||
tests=[TempCoTest(), LoadRegTest(), LineRegTest()],
|
||||
instruments=instruments,
|
||||
config={"common_setting": 42},
|
||||
operator="alice@example.com"
|
||||
)
|
||||
for run_id in run_ids:
|
||||
run = repository.get_run(run_id)
|
||||
print(f"{run.test_name}: {run.status.value}")
|
||||
"""
|
||||
run_ids = []
|
||||
for test in tests:
|
||||
run_id = self.run_test(
|
||||
test=test,
|
||||
instruments=instruments,
|
||||
config=config,
|
||||
operator=operator,
|
||||
)
|
||||
run_ids.append(run_id)
|
||||
return run_ids
|
||||
158
src/py_dvt_ate/tests/base.py
Normal file
158
src/py_dvt_ate/tests/base.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""Base class and utilities for DVT test implementations.
|
||||
|
||||
This module provides common functionality shared across all DVT tests,
|
||||
including thermal settling helpers, measurement utilities, and statistical
|
||||
calculations.
|
||||
"""
|
||||
|
||||
import time
|
||||
from abc import ABC
|
||||
from collections.abc import Callable
|
||||
|
||||
from py_dvt_ate.framework.context import ITest, TestContext
|
||||
|
||||
|
||||
class BaseDVTTest(ITest, ABC):
|
||||
"""Abstract base class for DVT tests with common utilities.
|
||||
|
||||
Provides helper methods for thermal settling, measurement averaging,
|
||||
and other common test patterns. All DVT tests should inherit from
|
||||
this class rather than directly from ITest.
|
||||
"""
|
||||
|
||||
def wait_for_temperature(
|
||||
self,
|
||||
context: TestContext,
|
||||
setpoint: float,
|
||||
timeout: float = 300.0,
|
||||
poll_interval: float = 1.0,
|
||||
) -> bool:
|
||||
"""Wait for thermal chamber to stabilise at setpoint.
|
||||
|
||||
Sets the chamber temperature and waits until stable. Logs progress
|
||||
to the test logger.
|
||||
|
||||
Args:
|
||||
context: Test context with instruments and logger.
|
||||
setpoint: Target temperature in degrees Celsius.
|
||||
timeout: Maximum wait time in seconds. Default 300s (5 minutes).
|
||||
poll_interval: Time between stability checks. Default 1s.
|
||||
|
||||
Returns:
|
||||
True if temperature stabilised within timeout, False if timed out.
|
||||
|
||||
Raises:
|
||||
ConnectionError: If instrument communication fails.
|
||||
IOError: If instrument reports error.
|
||||
"""
|
||||
chamber = context.instruments.chamber
|
||||
|
||||
# Set the temperature
|
||||
chamber.set_temperature(setpoint)
|
||||
context.logger.log_event(
|
||||
f"Set thermal chamber to {setpoint:.1f}°C, waiting for stability...",
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
# Wait for stability
|
||||
start_time = time.time()
|
||||
elapsed = 0.0
|
||||
|
||||
while elapsed < timeout:
|
||||
if chamber.is_stable():
|
||||
actual = chamber.get_temperature()
|
||||
context.logger.log_event(
|
||||
f"Chamber stable at {actual:.2f}°C "
|
||||
f"(target {setpoint:.1f}°C) after {elapsed:.1f}s",
|
||||
level="INFO",
|
||||
)
|
||||
return True
|
||||
|
||||
time.sleep(poll_interval)
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
# Timeout
|
||||
actual = chamber.get_temperature()
|
||||
context.logger.log_event(
|
||||
f"Timeout waiting for stability. Chamber at {actual:.2f}°C, "
|
||||
f"target {setpoint:.1f}°C after {timeout:.1f}s",
|
||||
level="WARNING",
|
||||
)
|
||||
return False
|
||||
|
||||
def measure_averaged(
|
||||
self,
|
||||
measurement_func: Callable[[], float],
|
||||
num_samples: int = 5,
|
||||
settle_time: float = 0.1,
|
||||
) -> tuple[float, float]:
|
||||
"""Take multiple measurements and return mean and standard deviation.
|
||||
|
||||
Useful for reducing noise in measurements by averaging multiple samples.
|
||||
|
||||
Args:
|
||||
measurement_func: Function that returns a single measurement.
|
||||
num_samples: Number of samples to average. Default 5.
|
||||
settle_time: Delay between samples in seconds. Default 0.1s.
|
||||
|
||||
Returns:
|
||||
Tuple of (mean, standard_deviation).
|
||||
|
||||
Raises:
|
||||
ValueError: If num_samples < 1.
|
||||
Exception: If measurement_func raises an exception.
|
||||
"""
|
||||
if num_samples < 1:
|
||||
raise ValueError("num_samples must be at least 1")
|
||||
|
||||
samples: list[float] = []
|
||||
for _ in range(num_samples):
|
||||
if settle_time > 0 and len(samples) > 0:
|
||||
time.sleep(settle_time)
|
||||
samples.append(measurement_func())
|
||||
|
||||
mean = sum(samples) / len(samples)
|
||||
|
||||
if len(samples) == 1:
|
||||
std_dev = 0.0
|
||||
else:
|
||||
variance = sum((x - mean) ** 2 for x in samples) / (len(samples) - 1)
|
||||
std_dev = variance ** 0.5
|
||||
|
||||
return mean, std_dev
|
||||
|
||||
def thermal_settle(
|
||||
self,
|
||||
context: TestContext,
|
||||
additional_settle_time: float = 5.0,
|
||||
) -> None:
|
||||
"""Wait for additional thermal settling after chamber reports stable.
|
||||
|
||||
After the chamber reports stable temperature, this adds additional
|
||||
settling time to ensure the DUT junction temperature has also stabilised.
|
||||
This is important for measurements sensitive to self-heating effects.
|
||||
|
||||
Args:
|
||||
context: Test context with logger.
|
||||
additional_settle_time: Extra settling time in seconds. Default 5s.
|
||||
"""
|
||||
if additional_settle_time > 0:
|
||||
context.logger.log_event(
|
||||
f"Additional thermal settling for {additional_settle_time:.1f}s...",
|
||||
level="INFO",
|
||||
)
|
||||
time.sleep(additional_settle_time)
|
||||
|
||||
def delay(self, seconds: float, message: str | None = None) -> None:
|
||||
"""Sleep for specified duration.
|
||||
|
||||
Simple utility for adding delays in test sequences.
|
||||
|
||||
Args:
|
||||
seconds: Delay duration in seconds.
|
||||
message: Optional message describing reason for delay.
|
||||
"""
|
||||
if message:
|
||||
# Could log this if needed
|
||||
pass
|
||||
time.sleep(seconds)
|
||||
243
src/py_dvt_ate/tests/thermal/tempco.py
Normal file
243
src/py_dvt_ate/tests/thermal/tempco.py
Normal file
@@ -0,0 +1,243 @@
|
||||
"""Temperature Coefficient (TempCo) characterisation test.
|
||||
|
||||
This test characterises the output voltage temperature coefficient by
|
||||
sweeping the chamber temperature and measuring output voltage at each point.
|
||||
The TempCo is calculated from the linear regression slope and expressed
|
||||
in parts per million per degree Celsius (ppm/°C).
|
||||
"""
|
||||
|
||||
from py_dvt_ate.data.models import TestStatus
|
||||
from py_dvt_ate.framework.context import TestContext
|
||||
from py_dvt_ate.tests.base import BaseDVTTest
|
||||
|
||||
|
||||
class TempCoTest(BaseDVTTest):
|
||||
"""Temperature coefficient characterisation test.
|
||||
|
||||
Measures how output voltage varies with temperature. This is a critical
|
||||
parameter for voltage regulators, as it indicates stability across
|
||||
the operating temperature range.
|
||||
|
||||
Test Procedure:
|
||||
1. Configure DUT supply voltage and load current
|
||||
2. Sweep chamber temperature from min to max
|
||||
3. At each temperature point:
|
||||
- Wait for thermal stability
|
||||
- Measure output voltage (averaged)
|
||||
- Log measurement with conditions
|
||||
4. Calculate TempCo from linear regression
|
||||
5. Evaluate against specification limits
|
||||
|
||||
Configuration:
|
||||
temperatures: List of temperature points (°C). Default: [-40, -20, 0, 25, 50, 85]
|
||||
input_voltage: DUT input voltage (V). Default: 5.0
|
||||
load_current: DUT load current (A). Default: 0.1
|
||||
settle_time: Additional settling time at each temp (s). Default: 5.0
|
||||
num_samples: Number of measurements to average per point. Default: 5
|
||||
tempco_limit: Maximum allowed TempCo magnitude (ppm/°C). Default: ±50.0
|
||||
"""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Return test identifier."""
|
||||
return "tempco"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
"""Return test description."""
|
||||
return "Output voltage temperature coefficient"
|
||||
|
||||
def execute(self, context: TestContext) -> TestStatus:
|
||||
"""Execute TempCo characterisation test.
|
||||
|
||||
Args:
|
||||
context: Test context with instruments, logger, and configuration.
|
||||
|
||||
Returns:
|
||||
PASSED if TempCo is within limits, FAILED otherwise.
|
||||
ERROR if a critical failure occurs.
|
||||
"""
|
||||
try:
|
||||
# Get configuration
|
||||
config = context.config
|
||||
temperatures = config.get("temperatures", [-40.0, -20.0, 0.0, 25.0, 50.0, 85.0])
|
||||
input_voltage = config.get("input_voltage", 5.0)
|
||||
load_current = config.get("load_current", 0.1)
|
||||
settle_time = config.get("settle_time", 5.0)
|
||||
num_samples = config.get("num_samples", 5)
|
||||
tempco_limit = config.get("tempco_limit", 50.0)
|
||||
|
||||
context.logger.log_event(
|
||||
f"Starting TempCo test: {len(temperatures)} temperature points, "
|
||||
f"Vin={input_voltage}V, Iload={load_current}A",
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
# Configure DUT power
|
||||
context.logger.log_event(
|
||||
f"Configuring PSU: Vin={input_voltage}V, Ilimit={load_current + 0.5}A",
|
||||
level="INFO",
|
||||
)
|
||||
psu = context.instruments.psu
|
||||
psu.set_voltage(1, input_voltage)
|
||||
psu.set_current_limit(1, load_current + 0.5) # Add headroom
|
||||
psu.enable_output(1, True)
|
||||
|
||||
# Storage for measurements
|
||||
temp_points: list[float] = []
|
||||
vout_points: list[float] = []
|
||||
|
||||
# Temperature sweep
|
||||
for temp_setpoint in temperatures:
|
||||
context.logger.log_event(
|
||||
f"Temperature point: {temp_setpoint}°C",
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
# Wait for thermal stability
|
||||
stable = self.wait_for_temperature(
|
||||
context,
|
||||
temp_setpoint,
|
||||
timeout=300.0,
|
||||
)
|
||||
if not stable:
|
||||
context.logger.log_event(
|
||||
f"Warning: Temperature did not stabilise at {temp_setpoint}°C",
|
||||
level="WARNING",
|
||||
)
|
||||
|
||||
# Additional settling for DUT junction temperature
|
||||
self.thermal_settle(context, settle_time)
|
||||
|
||||
# Measure output voltage (averaged)
|
||||
actual_temp = context.instruments.chamber.get_temperature()
|
||||
|
||||
def measure_vout() -> float:
|
||||
return context.instruments.dmm.measure_dc_voltage()
|
||||
|
||||
vout_mean, vout_std = self.measure_averaged(
|
||||
measure_vout,
|
||||
num_samples=num_samples,
|
||||
)
|
||||
|
||||
# Log individual measurement
|
||||
context.logger.log_measurement(
|
||||
parameter="v_out",
|
||||
value=vout_mean,
|
||||
unit="V",
|
||||
conditions={
|
||||
"temperature": actual_temp,
|
||||
"input_voltage": input_voltage,
|
||||
"load_current": load_current,
|
||||
},
|
||||
)
|
||||
|
||||
context.logger.log_event(
|
||||
f"Measured Vout = {vout_mean:.6f}V ± {vout_std * 1e6:.1f}μV "
|
||||
f"at T={actual_temp:.2f}°C",
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
# Store for TempCo calculation
|
||||
temp_points.append(actual_temp)
|
||||
vout_points.append(vout_mean)
|
||||
|
||||
# Calculate TempCo from linear regression
|
||||
tempco_ppm = self._calculate_tempco(temp_points, vout_points)
|
||||
|
||||
context.logger.log_event(
|
||||
f"Calculated TempCo = {tempco_ppm:.2f} ppm/°C",
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
# Log result with limits
|
||||
context.logger.log_result(
|
||||
parameter="temp_co",
|
||||
value=tempco_ppm,
|
||||
unit="ppm/°C",
|
||||
lower_limit=-abs(tempco_limit),
|
||||
upper_limit=abs(tempco_limit),
|
||||
)
|
||||
|
||||
# Evaluate pass/fail
|
||||
passed = abs(tempco_ppm) <= tempco_limit
|
||||
|
||||
if passed:
|
||||
context.logger.log_event(
|
||||
f"TempCo test PASSED: {tempco_ppm:.2f} ppm/°C within ±{tempco_limit} ppm/°C",
|
||||
level="INFO",
|
||||
)
|
||||
return TestStatus.PASSED
|
||||
else:
|
||||
context.logger.log_event(
|
||||
f"TempCo test FAILED: {tempco_ppm:.2f} ppm/°C exceeds ±{tempco_limit} ppm/°C",
|
||||
level="ERROR",
|
||||
)
|
||||
return TestStatus.FAILED
|
||||
|
||||
except Exception as e:
|
||||
context.logger.log_event(
|
||||
f"TempCo test ERROR: {e!s}",
|
||||
level="ERROR",
|
||||
)
|
||||
return TestStatus.ERROR
|
||||
|
||||
finally:
|
||||
# Cleanup: disable PSU output
|
||||
try:
|
||||
context.instruments.psu.enable_output(1, False)
|
||||
context.logger.log_event("PSU output disabled", level="INFO")
|
||||
except Exception:
|
||||
pass # Best effort cleanup
|
||||
|
||||
def _calculate_tempco(
|
||||
self,
|
||||
temperatures: list[float],
|
||||
voltages: list[float],
|
||||
) -> float:
|
||||
"""Calculate temperature coefficient from measurements.
|
||||
|
||||
Uses linear regression to find the slope (dV/dT), then converts
|
||||
to ppm/°C relative to the nominal voltage (voltage at median temperature).
|
||||
|
||||
Args:
|
||||
temperatures: Temperature measurements in °C.
|
||||
voltages: Output voltage measurements in V.
|
||||
|
||||
Returns:
|
||||
Temperature coefficient in ppm/°C.
|
||||
|
||||
Raises:
|
||||
ValueError: If insufficient data points.
|
||||
"""
|
||||
if len(temperatures) < 2 or len(temperatures) != len(voltages):
|
||||
raise ValueError("Need at least 2 matching temperature-voltage pairs")
|
||||
|
||||
n = len(temperatures)
|
||||
|
||||
# Linear regression: V = a + b*T
|
||||
# We want slope b = dV/dT
|
||||
mean_t = sum(temperatures) / n
|
||||
mean_v = sum(voltages) / n
|
||||
|
||||
# Covariance and variance
|
||||
cov = sum(
|
||||
(t - mean_t) * (v - mean_v)
|
||||
for t, v in zip(temperatures, voltages, strict=True)
|
||||
)
|
||||
var_t = sum((t - mean_t) ** 2 for t in temperatures)
|
||||
|
||||
if var_t == 0:
|
||||
raise ValueError("Temperature variance is zero (all temps identical)")
|
||||
|
||||
slope = cov / var_t # dV/dT in V/°C
|
||||
|
||||
# Find nominal voltage (voltage at median temperature)
|
||||
sorted_pairs = sorted(zip(temperatures, voltages, strict=True))
|
||||
mid_idx = len(sorted_pairs) // 2
|
||||
v_nominal = sorted_pairs[mid_idx][1]
|
||||
|
||||
# Convert to ppm/°C: (dV/dT) / V_nom * 10^6
|
||||
tempco_ppm = (slope / v_nominal) * 1e6
|
||||
|
||||
return tempco_ppm
|
||||
267
tests/integration/test_tempco.py
Normal file
267
tests/integration/test_tempco.py
Normal file
@@ -0,0 +1,267 @@
|
||||
"""Integration tests for TempCo characterisation test.
|
||||
|
||||
Full end-to-end test of the TempCo test with simulated instruments.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from py_dvt_ate.data.models import TestStatus
|
||||
from py_dvt_ate.data.repository import SQLiteRepository
|
||||
from py_dvt_ate.framework.context import TestContext
|
||||
from py_dvt_ate.framework.logger import TestLogger
|
||||
from py_dvt_ate.instruments.factory import InstrumentConfig, InstrumentFactory
|
||||
from py_dvt_ate.simulation.server import ServerConfig, SimulationServer
|
||||
from py_dvt_ate.tests.thermal.tempco import TempCoTest
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="function")
|
||||
class TestTempCoIntegration:
|
||||
"""Integration tests for TempCo test with simulator."""
|
||||
|
||||
async def test_tempco_runs_successfully(self, tmp_path: Path) -> None:
|
||||
"""Test TempCo test runs end-to-end with simulator."""
|
||||
# Start simulation server
|
||||
server_config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=17000,
|
||||
psu_port=17001,
|
||||
dmm_port=17002,
|
||||
physics_rate_hz=100.0,
|
||||
)
|
||||
server = SimulationServer(server_config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Create instrument set connected to simulator
|
||||
instrument_config = InstrumentConfig(
|
||||
backend="simulator",
|
||||
simulator_host="127.0.0.1",
|
||||
chamber_port=17000,
|
||||
psu_port=17001,
|
||||
dmm_port=17002,
|
||||
)
|
||||
instruments = InstrumentFactory.create(instrument_config)
|
||||
|
||||
# Connect to instruments
|
||||
instruments.chamber.connect()
|
||||
instruments.psu.connect()
|
||||
instruments.dmm.connect()
|
||||
|
||||
# Configure instruments
|
||||
instruments.chamber.set_ramp_rate(10.0) # Fast ramp for testing
|
||||
instruments.psu.enable_output(1, False) # Ensure off initially
|
||||
|
||||
# Create test repository
|
||||
db_path = tmp_path / "test.db"
|
||||
repository = SQLiteRepository(db_path)
|
||||
|
||||
# Create test run
|
||||
run_id = repository.create_run(
|
||||
test_name="tempco",
|
||||
config={
|
||||
"temperatures": [0.0, 25.0, 50.0], # Reduced for faster test
|
||||
"input_voltage": 5.0,
|
||||
"load_current": 0.1,
|
||||
"settle_time": 0.5, # Reduced for faster test
|
||||
"num_samples": 3, # Reduced for faster test
|
||||
"tempco_limit": 100.0, # Relaxed for testing
|
||||
},
|
||||
description="Integration test of TempCo",
|
||||
)
|
||||
|
||||
# Create test logger
|
||||
logger = TestLogger(run_id, repository)
|
||||
|
||||
# Create test context
|
||||
context = TestContext(
|
||||
run_id=run_id,
|
||||
instruments=instruments,
|
||||
logger=logger,
|
||||
config={
|
||||
"temperatures": [0.0, 25.0, 50.0],
|
||||
"input_voltage": 5.0,
|
||||
"load_current": 0.1,
|
||||
"settle_time": 0.5,
|
||||
"num_samples": 3,
|
||||
"tempco_limit": 100.0,
|
||||
},
|
||||
)
|
||||
|
||||
# Create and execute test
|
||||
test = TempCoTest()
|
||||
assert test.name == "tempco"
|
||||
assert test.description == "Output voltage temperature coefficient"
|
||||
|
||||
# Run test (this is synchronous, but simulation runs async in background)
|
||||
status = test.execute(context)
|
||||
|
||||
# Verify test completed
|
||||
assert status in (TestStatus.PASSED, TestStatus.FAILED)
|
||||
|
||||
# Flush logger to ensure all data is written
|
||||
logger.flush()
|
||||
|
||||
# Update run status
|
||||
repository.complete_run(run_id, status)
|
||||
|
||||
# Verify results were logged
|
||||
results = repository.get_results(run_id)
|
||||
assert len(results) > 0
|
||||
|
||||
# Find TempCo result
|
||||
tempco_result = next(r for r in results if r.parameter == "temp_co")
|
||||
assert tempco_result is not None
|
||||
assert tempco_result.unit == "ppm/°C"
|
||||
assert tempco_result.lower_limit == -100.0
|
||||
assert tempco_result.upper_limit == 100.0
|
||||
|
||||
# Verify measurements were logged
|
||||
df = repository.get_measurements_dataframe(run_id)
|
||||
assert df is not None
|
||||
assert len(df) >= 3 # At least 3 temperature points
|
||||
|
||||
# Verify v_out measurements exist
|
||||
vout_measurements = df[df["parameter"] == "v_out"]
|
||||
assert len(vout_measurements) >= 3
|
||||
|
||||
# Verify temperature conditions were logged
|
||||
assert "temperature" in df.columns
|
||||
temps_recorded = vout_measurements["temperature"].unique()
|
||||
assert len(temps_recorded) >= 3
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
async def test_tempco_with_minimal_config(self, tmp_path: Path) -> None:
|
||||
"""Test TempCo uses default configuration when not specified."""
|
||||
# Start simulation server
|
||||
server_config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=17100,
|
||||
psu_port=17101,
|
||||
dmm_port=17102,
|
||||
)
|
||||
server = SimulationServer(server_config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Create instrument set
|
||||
instrument_config = InstrumentConfig(
|
||||
backend="simulator",
|
||||
simulator_host="127.0.0.1",
|
||||
chamber_port=17100,
|
||||
psu_port=17101,
|
||||
dmm_port=17102,
|
||||
)
|
||||
instruments = InstrumentFactory.create(instrument_config)
|
||||
|
||||
# Connect to instruments
|
||||
instruments.chamber.connect()
|
||||
instruments.psu.connect()
|
||||
instruments.dmm.connect()
|
||||
|
||||
# Create repository
|
||||
db_path = tmp_path / "test_minimal.db"
|
||||
repository = SQLiteRepository(db_path)
|
||||
run_id = repository.create_run(
|
||||
test_name="tempco",
|
||||
config={}, # Empty config - should use defaults
|
||||
)
|
||||
|
||||
# Create logger and context with minimal config
|
||||
logger = TestLogger(run_id, repository)
|
||||
context = TestContext(
|
||||
run_id=run_id,
|
||||
instruments=instruments,
|
||||
logger=logger,
|
||||
config={
|
||||
# Override temperatures for faster test
|
||||
"temperatures": [25.0, 50.0],
|
||||
"settle_time": 0.2,
|
||||
"num_samples": 2,
|
||||
},
|
||||
)
|
||||
|
||||
# Execute test
|
||||
test = TempCoTest()
|
||||
status = test.execute(context)
|
||||
|
||||
# Should complete without error
|
||||
assert status in (TestStatus.PASSED, TestStatus.FAILED, TestStatus.ERROR)
|
||||
|
||||
logger.flush()
|
||||
repository.complete_run(run_id, status)
|
||||
|
||||
# Verify some data was logged
|
||||
results = repository.get_results(run_id)
|
||||
assert len(results) >= 1
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
async def test_tempco_handles_errors_gracefully(self, tmp_path: Path) -> None:
|
||||
"""Test TempCo returns ERROR status when instruments fail."""
|
||||
# Start simulation server
|
||||
server_config = ServerConfig(
|
||||
host="127.0.0.1",
|
||||
chamber_port=17200,
|
||||
psu_port=17201,
|
||||
dmm_port=17202,
|
||||
)
|
||||
server = SimulationServer(server_config)
|
||||
await server.start()
|
||||
|
||||
try:
|
||||
# Create instrument set
|
||||
instrument_config = InstrumentConfig(
|
||||
backend="simulator",
|
||||
simulator_host="127.0.0.1",
|
||||
chamber_port=17200,
|
||||
psu_port=17201,
|
||||
dmm_port=17202,
|
||||
)
|
||||
instruments = InstrumentFactory.create(instrument_config)
|
||||
|
||||
# Connect to instruments
|
||||
instruments.chamber.connect()
|
||||
instruments.psu.connect()
|
||||
instruments.dmm.connect()
|
||||
|
||||
# Create repository
|
||||
db_path = tmp_path / "test_error.db"
|
||||
repository = SQLiteRepository(db_path)
|
||||
run_id = repository.create_run(test_name="tempco", config={})
|
||||
|
||||
# Create logger and context
|
||||
logger = TestLogger(run_id, repository)
|
||||
context = TestContext(
|
||||
run_id=run_id,
|
||||
instruments=instruments,
|
||||
logger=logger,
|
||||
config={
|
||||
"temperatures": [], # Invalid: empty temperature list
|
||||
"settle_time": 0.1,
|
||||
},
|
||||
)
|
||||
|
||||
# Execute test
|
||||
test = TempCoTest()
|
||||
|
||||
# Should handle gracefully (may return FAILED or ERROR)
|
||||
# The test should not raise an unhandled exception
|
||||
try:
|
||||
status = test.execute(context)
|
||||
# If it completes, it should indicate an error or failure
|
||||
assert status in (TestStatus.ERROR, TestStatus.FAILED)
|
||||
except Exception:
|
||||
# Or it might raise, which we also consider handled
|
||||
pass
|
||||
|
||||
logger.flush()
|
||||
|
||||
finally:
|
||||
await server.stop()
|
||||
266
tests/unit/test_config.py
Normal file
266
tests/unit/test_config.py
Normal file
@@ -0,0 +1,266 @@
|
||||
"""Tests for configuration loading and validation."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
from pydantic import ValidationError
|
||||
|
||||
from py_dvt_ate.app.config import (
|
||||
APIConfig,
|
||||
AppConfig,
|
||||
ChamberConfig,
|
||||
DashboardConfig,
|
||||
DataConfig,
|
||||
DUTConfig,
|
||||
DUTParameters,
|
||||
InstrumentsConfig,
|
||||
LoggingConfig,
|
||||
PhysicsConfig,
|
||||
PyVISAConfig,
|
||||
SimulatorConfig,
|
||||
ThermalConfig,
|
||||
load_config,
|
||||
)
|
||||
|
||||
|
||||
def test_default_config_values() -> None:
|
||||
"""Test that default configuration values are correct."""
|
||||
config = AppConfig()
|
||||
|
||||
assert config.instruments.backend == "simulator"
|
||||
assert config.instruments.simulator.host == "localhost"
|
||||
assert config.instruments.simulator.thermal_chamber_port == 5001
|
||||
|
||||
assert config.physics.update_rate_hz == 100.0
|
||||
assert config.physics.thermal.chamber_time_constant_s == 30.0
|
||||
assert config.physics.thermal.theta_jc == 15.0
|
||||
|
||||
assert config.dut.model == "ldo"
|
||||
assert config.dut.parameters.nominal_output_voltage == 3.3
|
||||
assert config.dut.parameters.tempco_ppm_per_c == 50.0
|
||||
|
||||
assert config.data.database_path == "./data/py_dvt_ate.db"
|
||||
assert config.logging.level == "INFO"
|
||||
assert config.dashboard.enabled is True
|
||||
assert config.api.enabled is False
|
||||
|
||||
|
||||
def test_load_config_with_defaults_only() -> None:
|
||||
"""Test loading config without a file uses defaults."""
|
||||
config = load_config(None)
|
||||
|
||||
assert config.instruments.backend == "simulator"
|
||||
assert config.physics.update_rate_hz == 100.0
|
||||
|
||||
|
||||
def test_load_config_from_file(tmp_path: Path) -> None:
|
||||
"""Test loading configuration from YAML file."""
|
||||
config_file = tmp_path / "test_config.yaml"
|
||||
config_data = {
|
||||
"instruments": {"backend": "pyvisa"},
|
||||
"physics": {"update_rate_hz": 50.0},
|
||||
"dut": {"model": "custom_ldo"},
|
||||
}
|
||||
|
||||
with config_file.open("w") as f:
|
||||
yaml.dump(config_data, f)
|
||||
|
||||
config = load_config(config_file)
|
||||
|
||||
assert config.instruments.backend == "pyvisa"
|
||||
assert config.physics.update_rate_hz == 50.0
|
||||
assert config.dut.model == "custom_ldo"
|
||||
# Defaults still apply
|
||||
assert config.instruments.simulator.host == "localhost"
|
||||
|
||||
|
||||
def test_load_config_partial_override(tmp_path: Path) -> None:
|
||||
"""Test that partial config overrides work correctly."""
|
||||
config_file = tmp_path / "partial.yaml"
|
||||
config_data = {
|
||||
"physics": {
|
||||
"thermal": {
|
||||
"theta_jc": 20.0,
|
||||
# Other thermal params should use defaults
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
with config_file.open("w") as f:
|
||||
yaml.dump(config_data, f)
|
||||
|
||||
config = load_config(config_file)
|
||||
|
||||
# Overridden value
|
||||
assert config.physics.thermal.theta_jc == 20.0
|
||||
# Default values
|
||||
assert config.physics.thermal.theta_ca == 5.0
|
||||
assert config.physics.thermal.chamber_time_constant_s == 30.0
|
||||
|
||||
|
||||
def test_load_config_missing_file() -> None:
|
||||
"""Test that loading from missing file raises FileNotFoundError."""
|
||||
with pytest.raises(FileNotFoundError, match="Configuration file not found"):
|
||||
load_config("nonexistent.yaml")
|
||||
|
||||
|
||||
def test_load_config_invalid_yaml(tmp_path: Path) -> None:
|
||||
"""Test that malformed YAML raises an error."""
|
||||
config_file = tmp_path / "invalid.yaml"
|
||||
config_file.write_text("invalid: yaml: content: [\n")
|
||||
|
||||
with pytest.raises(yaml.YAMLError):
|
||||
load_config(config_file)
|
||||
|
||||
|
||||
def test_load_config_validation_error(tmp_path: Path) -> None:
|
||||
"""Test that invalid configuration raises ValidationError."""
|
||||
config_file = tmp_path / "invalid_config.yaml"
|
||||
config_data = {
|
||||
"instruments": {"backend": "invalid_backend"}, # Not in Literal["simulator", "pyvisa"]
|
||||
}
|
||||
|
||||
with config_file.open("w") as f:
|
||||
yaml.dump(config_data, f)
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
load_config(config_file)
|
||||
|
||||
|
||||
def test_env_override_simple(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Test environment variable override for simple values."""
|
||||
config_file = tmp_path / "config.yaml"
|
||||
config_data: dict[str, Any] = {}
|
||||
|
||||
with config_file.open("w") as f:
|
||||
yaml.dump(config_data, f)
|
||||
|
||||
monkeypatch.setenv("PYDVTATE__INSTRUMENTS__BACKEND", "pyvisa")
|
||||
monkeypatch.setenv("PYDVTATE__PHYSICS__UPDATE_RATE_HZ", "200.0")
|
||||
|
||||
config = load_config(config_file)
|
||||
|
||||
assert config.instruments.backend == "pyvisa"
|
||||
assert config.physics.update_rate_hz == 200.0
|
||||
|
||||
|
||||
def test_env_override_nested(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Test environment variable override for nested values."""
|
||||
config_file = tmp_path / "config.yaml"
|
||||
config_data: dict[str, Any] = {}
|
||||
|
||||
with config_file.open("w") as f:
|
||||
yaml.dump(config_data, f)
|
||||
|
||||
monkeypatch.setenv("PYDVTATE__INSTRUMENTS__SIMULATOR__HOST", "192.168.1.100")
|
||||
monkeypatch.setenv("PYDVTATE__INSTRUMENTS__SIMULATOR__THERMAL_CHAMBER_PORT", "6001")
|
||||
monkeypatch.setenv("PYDVTATE__PHYSICS__THERMAL__THETA_JC", "25.0")
|
||||
|
||||
config = load_config(config_file)
|
||||
|
||||
assert config.instruments.simulator.host == "192.168.1.100"
|
||||
assert config.instruments.simulator.thermal_chamber_port == 6001
|
||||
assert config.physics.thermal.theta_jc == 25.0
|
||||
|
||||
|
||||
def test_env_override_types(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Test that environment variables are parsed to correct types."""
|
||||
config_file = tmp_path / "config.yaml"
|
||||
config_data: dict[str, Any] = {}
|
||||
|
||||
with config_file.open("w") as f:
|
||||
yaml.dump(config_data, f)
|
||||
|
||||
monkeypatch.setenv("PYDVTATE__DASHBOARD__ENABLED", "false") # bool
|
||||
monkeypatch.setenv("PYDVTATE__DASHBOARD__PORT", "9000") # int
|
||||
monkeypatch.setenv("PYDVTATE__PHYSICS__UPDATE_RATE_HZ", "75.5") # float
|
||||
|
||||
config = load_config(config_file)
|
||||
|
||||
assert config.dashboard.enabled is False
|
||||
assert config.dashboard.port == 9000
|
||||
assert config.physics.update_rate_hz == 75.5
|
||||
|
||||
|
||||
def test_env_override_precedence(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Test that environment variables override file values."""
|
||||
config_file = tmp_path / "config.yaml"
|
||||
config_data = {"physics": {"update_rate_hz": 50.0}}
|
||||
|
||||
with config_file.open("w") as f:
|
||||
yaml.dump(config_data, f)
|
||||
|
||||
monkeypatch.setenv("PYDVTATE__PHYSICS__UPDATE_RATE_HZ", "150.0")
|
||||
|
||||
config = load_config(config_file)
|
||||
|
||||
# Environment variable should win
|
||||
assert config.physics.update_rate_hz == 150.0
|
||||
|
||||
|
||||
def test_env_variables_ignored_without_prefix(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Test that environment variables without prefix are ignored."""
|
||||
config_file = tmp_path / "config.yaml"
|
||||
config_data: dict[str, Any] = {}
|
||||
|
||||
with config_file.open("w") as f:
|
||||
yaml.dump(config_data, f)
|
||||
|
||||
# These should be ignored
|
||||
monkeypatch.setenv("BACKEND", "pyvisa")
|
||||
monkeypatch.setenv("UPDATE_RATE_HZ", "200.0")
|
||||
|
||||
config = load_config(config_file)
|
||||
|
||||
# Should use defaults
|
||||
assert config.instruments.backend == "simulator"
|
||||
assert config.physics.update_rate_hz == 100.0
|
||||
|
||||
|
||||
def test_simulator_config_defaults() -> None:
|
||||
"""Test SimulatorConfig default values."""
|
||||
config = SimulatorConfig()
|
||||
assert config.host == "localhost"
|
||||
assert config.thermal_chamber_port == 5001
|
||||
assert config.power_supply_port == 5002
|
||||
assert config.multimeter_port == 5003
|
||||
|
||||
|
||||
def test_pyvisa_config_defaults() -> None:
|
||||
"""Test PyVISAConfig default values."""
|
||||
config = PyVISAConfig()
|
||||
assert config.thermal_chamber is None
|
||||
assert config.power_supply is None
|
||||
assert config.multimeter is None
|
||||
|
||||
|
||||
def test_complete_config_structure() -> None:
|
||||
"""Test that all config sections can be instantiated."""
|
||||
config = AppConfig(
|
||||
instruments=InstrumentsConfig(
|
||||
backend="pyvisa",
|
||||
simulator=SimulatorConfig(host="192.168.1.1"),
|
||||
pyvisa=PyVISAConfig(thermal_chamber="TCPIP::192.168.1.10::INSTR"),
|
||||
),
|
||||
physics=PhysicsConfig(
|
||||
update_rate_hz=50.0,
|
||||
thermal=ThermalConfig(theta_jc=20.0),
|
||||
chamber=ChamberConfig(ramp_rate_c_per_min=5.0),
|
||||
),
|
||||
dut=DUTConfig(
|
||||
model="custom", parameters=DUTParameters(nominal_output_voltage=5.0)
|
||||
),
|
||||
data=DataConfig(database_path="/tmp/test.db"),
|
||||
logging=LoggingConfig(level="DEBUG"),
|
||||
dashboard=DashboardConfig(enabled=False),
|
||||
api=APIConfig(enabled=True, port=9000),
|
||||
)
|
||||
|
||||
assert config.instruments.backend == "pyvisa"
|
||||
assert config.physics.update_rate_hz == 50.0
|
||||
assert config.dut.parameters.nominal_output_voltage == 5.0
|
||||
assert config.api.port == 9000
|
||||
295
tests/unit/test_repository.py
Normal file
295
tests/unit/test_repository.py
Normal file
@@ -0,0 +1,295 @@
|
||||
"""Unit tests for data repository."""
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
|
||||
import pandas as pd
|
||||
import pytest
|
||||
|
||||
from py_dvt_ate.data.models import Measurement, TestStatus
|
||||
from py_dvt_ate.data.repository import SQLiteRepository
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_db():
|
||||
"""Create a temporary database for testing."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "test.db"
|
||||
yield db_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def repository(temp_db):
|
||||
"""Create a repository instance for testing."""
|
||||
return SQLiteRepository(temp_db)
|
||||
|
||||
|
||||
def test_create_run(repository):
|
||||
"""Test creating a new test run."""
|
||||
config = {"temperature": 25.0, "voltage": 3.3}
|
||||
run_id = repository.create_run(
|
||||
test_name="TempCo Test",
|
||||
config=config,
|
||||
operator="Test Engineer",
|
||||
description="Test description",
|
||||
)
|
||||
|
||||
assert run_id is not None
|
||||
|
||||
# Verify run was created
|
||||
run = repository.get_run(run_id)
|
||||
assert run.test_name == "TempCo Test"
|
||||
assert run.operator == "Test Engineer"
|
||||
assert run.description == "Test description"
|
||||
assert run.status == TestStatus.PENDING
|
||||
|
||||
|
||||
def test_update_run_status(repository):
|
||||
"""Test updating test run status."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
repository.update_run_status(run_id, TestStatus.RUNNING)
|
||||
run = repository.get_run(run_id)
|
||||
assert run.status == TestStatus.RUNNING
|
||||
|
||||
repository.update_run_status(run_id, TestStatus.PASSED)
|
||||
run = repository.get_run(run_id)
|
||||
assert run.status == TestStatus.PASSED
|
||||
|
||||
|
||||
def test_complete_run(repository):
|
||||
"""Test completing a test run."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
repository.complete_run(run_id, TestStatus.PASSED)
|
||||
run = repository.get_run(run_id)
|
||||
|
||||
assert run.status == TestStatus.PASSED
|
||||
assert run.completed_at is not None
|
||||
|
||||
|
||||
def test_save_result(repository):
|
||||
"""Test saving a test result."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
repository.save_result(
|
||||
run_id=run_id,
|
||||
parameter="output_voltage",
|
||||
value=3.305,
|
||||
unit="V",
|
||||
lower_limit=3.267,
|
||||
upper_limit=3.333,
|
||||
)
|
||||
|
||||
results = repository.get_results(run_id)
|
||||
assert len(results) == 1
|
||||
|
||||
result = results[0]
|
||||
assert result.parameter == "output_voltage"
|
||||
assert result.value == 3.305
|
||||
assert result.unit == "V"
|
||||
assert result.lower_limit == 3.267
|
||||
assert result.upper_limit == 3.333
|
||||
assert result.passed is True
|
||||
|
||||
|
||||
def test_save_result_fail(repository):
|
||||
"""Test saving a failing test result."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
repository.save_result(
|
||||
run_id=run_id,
|
||||
parameter="output_voltage",
|
||||
value=3.350, # Outside upper limit
|
||||
unit="V",
|
||||
lower_limit=3.267,
|
||||
upper_limit=3.333,
|
||||
)
|
||||
|
||||
results = repository.get_results(run_id)
|
||||
result = results[0]
|
||||
assert result.passed is False
|
||||
|
||||
|
||||
def test_save_result_no_limits(repository):
|
||||
"""Test saving a result without limits."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
repository.save_result(
|
||||
run_id=run_id,
|
||||
parameter="temperature",
|
||||
value=25.5,
|
||||
unit="°C",
|
||||
)
|
||||
|
||||
results = repository.get_results(run_id)
|
||||
result = results[0]
|
||||
assert result.passed is None # No limits defined
|
||||
|
||||
|
||||
def test_save_measurements(repository):
|
||||
"""Test saving time-series measurements to Parquet."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
measurements = [
|
||||
Measurement(
|
||||
timestamp=1234567890.0,
|
||||
parameter="voltage",
|
||||
value=3.3,
|
||||
unit="V",
|
||||
temperature=25.0,
|
||||
input_voltage=5.0,
|
||||
load_current=0.1,
|
||||
),
|
||||
Measurement(
|
||||
timestamp=1234567891.0,
|
||||
parameter="voltage",
|
||||
value=3.31,
|
||||
unit="V",
|
||||
temperature=25.1,
|
||||
input_voltage=5.0,
|
||||
load_current=0.1,
|
||||
),
|
||||
]
|
||||
|
||||
repository.save_measurements(run_id, measurements)
|
||||
|
||||
# Verify measurements were saved
|
||||
df = repository.get_measurements_dataframe(run_id)
|
||||
assert df is not None
|
||||
assert len(df) == 2
|
||||
assert list(df["parameter"]) == ["voltage", "voltage"]
|
||||
assert list(df["value"]) == [3.3, 3.31]
|
||||
|
||||
|
||||
def test_save_measurements_append(repository):
|
||||
"""Test appending measurements to existing Parquet file."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
# Save first batch
|
||||
measurements1 = [
|
||||
Measurement(
|
||||
timestamp=1234567890.0,
|
||||
parameter="voltage",
|
||||
value=3.3,
|
||||
unit="V",
|
||||
)
|
||||
]
|
||||
repository.save_measurements(run_id, measurements1)
|
||||
|
||||
# Save second batch
|
||||
measurements2 = [
|
||||
Measurement(
|
||||
timestamp=1234567891.0,
|
||||
parameter="voltage",
|
||||
value=3.31,
|
||||
unit="V",
|
||||
)
|
||||
]
|
||||
repository.save_measurements(run_id, measurements2)
|
||||
|
||||
# Verify both batches are present
|
||||
df = repository.get_measurements_dataframe(run_id)
|
||||
assert df is not None
|
||||
assert len(df) == 2
|
||||
|
||||
|
||||
def test_get_measurements_nonexistent(repository):
|
||||
"""Test getting measurements for non-existent run."""
|
||||
fake_id = uuid4()
|
||||
df = repository.get_measurements_dataframe(fake_id)
|
||||
assert df is None
|
||||
|
||||
|
||||
def test_save_empty_measurements(repository):
|
||||
"""Test saving empty measurement list."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
repository.save_measurements(run_id, [])
|
||||
|
||||
df = repository.get_measurements_dataframe(run_id)
|
||||
assert df is None
|
||||
|
||||
|
||||
def test_get_nonexistent_run(repository):
|
||||
"""Test getting a non-existent run raises error."""
|
||||
fake_id = uuid4()
|
||||
with pytest.raises(ValueError, match="not found"):
|
||||
repository.get_run(fake_id)
|
||||
|
||||
|
||||
def test_multiple_results(repository):
|
||||
"""Test saving and retrieving multiple results."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
repository.save_result(run_id, "voltage", 3.3, "V")
|
||||
repository.save_result(run_id, "current", 50.0, "uA")
|
||||
repository.save_result(run_id, "temperature", 25.0, "°C")
|
||||
|
||||
results = repository.get_results(run_id)
|
||||
assert len(results) == 3
|
||||
|
||||
parameters = {r.parameter for r in results}
|
||||
assert parameters == {"voltage", "current", "temperature"}
|
||||
|
||||
|
||||
def test_custom_measurements_dir(temp_db):
|
||||
"""Test using a custom measurements directory."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
measurements_dir = Path(tmpdir) / "custom_measurements"
|
||||
repo = SQLiteRepository(temp_db, measurements_dir=measurements_dir)
|
||||
|
||||
run_id = repo.create_run("Test", config={})
|
||||
measurements = [
|
||||
Measurement(
|
||||
timestamp=1234567890.0,
|
||||
parameter="voltage",
|
||||
value=3.3,
|
||||
unit="V",
|
||||
)
|
||||
]
|
||||
repo.save_measurements(run_id, measurements)
|
||||
|
||||
# Verify file is in custom directory
|
||||
expected_path = measurements_dir / f"run_{run_id}" / "measurements.parquet"
|
||||
assert expected_path.exists()
|
||||
|
||||
|
||||
def test_parquet_schema(repository):
|
||||
"""Test that Parquet file has correct schema."""
|
||||
run_id = repository.create_run("Test", config={})
|
||||
|
||||
measurements = [
|
||||
Measurement(
|
||||
timestamp=1234567890.123,
|
||||
parameter="voltage",
|
||||
value=3.3,
|
||||
unit="V",
|
||||
temperature=25.5,
|
||||
input_voltage=5.0,
|
||||
load_current=0.1,
|
||||
)
|
||||
]
|
||||
repository.save_measurements(run_id, measurements)
|
||||
|
||||
df = repository.get_measurements_dataframe(run_id)
|
||||
assert df is not None
|
||||
|
||||
# Check columns
|
||||
expected_columns = {
|
||||
"timestamp",
|
||||
"parameter",
|
||||
"value",
|
||||
"unit",
|
||||
"temperature",
|
||||
"input_voltage",
|
||||
"load_current",
|
||||
}
|
||||
assert set(df.columns) == expected_columns
|
||||
|
||||
# Check data types (approximately)
|
||||
assert pd.api.types.is_float_dtype(df["timestamp"])
|
||||
assert pd.api.types.is_string_dtype(df["parameter"]) or pd.api.types.is_object_dtype(
|
||||
df["parameter"]
|
||||
)
|
||||
assert pd.api.types.is_float_dtype(df["value"])
|
||||
Reference in New Issue
Block a user