# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Fault-Tolerant Reconfigurable Control
"""Fault detection, isolation, injection, and reconfigurable control utilities."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum, auto
from typing import Any
import numpy as np
from numpy.typing import NDArray
FloatArray = NDArray[np.float64]
[docs]
class FaultType(Enum):
"""Actuator and sensor fault modes for FDI classification."""
STUCK_ACTUATOR = auto()
OPEN_CIRCUIT_ACTUATOR = auto()
SENSOR_DROPOUT = auto()
SENSOR_DRIFT = auto()
SENSOR_NOISE_INCREASE = auto()
[docs]
@dataclass
class FaultReport:
"""Detected fault: which component, fault mode, confidence, and detection time."""
component_index: int
is_sensor: bool
fault_type: FaultType
confidence: float
time_detected: float
[docs]
class FDIMonitor:
"""Fault Detection and Isolation based on innovation monitoring."""
def __init__(
self, n_sensors: int, n_actuators: int, threshold_sigma: float = 3.0, n_alert: int = 5
):
self.n_sensors = n_sensors
self.n_actuators = n_actuators
self.threshold_sigma = threshold_sigma
self.n_alert = n_alert
self.innovation_history = np.zeros((n_alert, n_sensors))
self.innovation_idx = 0
self.S_diag = np.ones(n_sensors)
self.detected_faults: list[FaultReport] = []
self.faulted_sensors: set[int] = set()
[docs]
def update(
self, y_measured: FloatArray, y_predicted: FloatArray, t: float
) -> list[FaultReport]:
"""Compare measurements to predictions; flag sensors exceeding threshold_sigma for n_alert consecutive steps."""
nu = y_measured - y_predicted
self.innovation_history[self.innovation_idx] = nu
self.innovation_idx = (self.innovation_idx + 1) % self.n_alert
new_faults = []
for i in range(self.n_sensors):
if i in self.faulted_sensors:
continue
hist = self.innovation_history[:, i]
sigma = np.sqrt(self.S_diag[i])
if np.all(np.abs(hist) > self.threshold_sigma * sigma):
if np.isnan(y_measured[i]) or abs(y_measured[i]) < 1e-6:
ftype = FaultType.SENSOR_DROPOUT
else:
ftype = FaultType.SENSOR_DRIFT
report = FaultReport(
component_index=i,
is_sensor=True,
fault_type=ftype,
confidence=1.0,
time_detected=t,
)
new_faults.append(report)
self.detected_faults.append(report)
self.faulted_sensors.add(i)
return new_faults
[docs]
class ReconfigurableController:
"""Adjusts control allocation based on detected faults."""
def __init__(self, base_controller: Any, jacobian: FloatArray, n_coils: int, n_sensors: int):
self.base_controller = base_controller
self.nominal_jacobian = jacobian.copy()
self.current_jacobian = jacobian.copy()
self.n_coils = n_coils
self.n_sensors = n_sensors
self.faulted_coils: set[int] = set()
self.faulted_sensors: set[int] = set()
self.stuck_values: dict[int, float] = {}
self.W = np.eye(jacobian.shape[0])
self.lambda_reg = 1e-6
self.K = self._compute_gain()
def _compute_gain(self) -> FloatArray:
"""Tikhonov pseudoinverse with faulted-coil rows zeroed."""
J = self.current_jacobian
J_T_W = J.T @ self.W
H = J_T_W @ J + self.lambda_reg * np.eye(self.n_coils)
try:
H_inv = np.linalg.inv(H)
K = H_inv @ J_T_W
except np.linalg.LinAlgError:
K = np.zeros_like(J.T)
for i in self.faulted_coils:
K[i, :] = 0.0
return np.asarray(K)
[docs]
def handle_actuator_fault(
self, coil_index: int, fault_type: FaultType, stuck_val: float = 0.0
) -> None:
"""Zero out the faulted coil column in J and recompute gain."""
if coil_index in self.faulted_coils:
return
self.faulted_coils.add(coil_index)
if fault_type == FaultType.STUCK_ACTUATOR:
self.stuck_values[coil_index] = stuck_val
self.current_jacobian[:, coil_index] = 0.0
self.K = self._compute_gain()
[docs]
def handle_sensor_fault(self, sensor_index: int, fault_type: FaultType) -> None:
"""
Accommodate sensor faults by reducing/removing affected measurement rows.
This controller uses a weighted least-squares allocation with `W`; sensor
faults are represented by adapting row weights that participate in the
control solve.
"""
if sensor_index < 0 or sensor_index >= self.n_sensors:
raise IndexError(f"sensor_index {sensor_index} out of range [0, {self.n_sensors - 1}]")
if sensor_index in self.faulted_sensors:
return
row_idx = sensor_index
if row_idx >= self.W.shape[0]:
raise IndexError(
f"sensor_index {sensor_index} cannot be mapped to Jacobian row space of size "
f"{self.W.shape[0]}"
)
self.faulted_sensors.add(sensor_index)
if fault_type == FaultType.SENSOR_DROPOUT:
weight = 0.0
elif fault_type == FaultType.SENSOR_NOISE_INCREASE:
weight = 0.2
elif fault_type == FaultType.SENSOR_DRIFT:
weight = 0.5
else:
# Unknown sensor-class mode: keep some authority but downweight.
weight = 0.5
self.W[row_idx, row_idx] = weight
self.K = self._compute_gain()
[docs]
def step(self, error: FloatArray, dt: float) -> FloatArray:
"""Compute coil current corrections, compensating stuck-actuator offsets."""
adjusted_error = error.copy()
for c_idx, val in self.stuck_values.items():
adjusted_error -= self.nominal_jacobian[:, c_idx] * val
delta_u = self.K @ adjusted_error
for c_idx in self.faulted_coils:
delta_u[c_idx] = 0.0
return np.asarray(delta_u)
[docs]
def controllability_check(self) -> bool:
"""Return True if enough healthy coils remain for minimum-rank controllability."""
if len(self.faulted_coils) > self.n_coils // 2:
return False
J = self.current_jacobian
rank = np.linalg.matrix_rank(J)
min_required_rank = 2
return bool(rank >= min_required_rank)
[docs]
def graceful_shutdown(self) -> FloatArray:
"""Return zero-current command vector for safe ramp-down."""
return np.zeros(self.n_coils)
[docs]
class FaultInjector:
"""Injects sensor faults (dropout or drift) at a specified time for testing."""
def __init__(
self, fault_time: float, component_index: int, fault_type: FaultType, severity: float = 1.0
):
self.fault_time = fault_time
self.component_index = component_index
self.fault_type = fault_type
self.severity = severity
[docs]
def inject(self, t: float, signals: FloatArray) -> FloatArray:
"""Return a copied signal vector with the configured fault applied after fault_time."""
if t < self.fault_time:
return signals
corrupted = signals.copy()
if self.fault_type == FaultType.SENSOR_DROPOUT:
corrupted[self.component_index] = 0.0
elif self.fault_type == FaultType.SENSOR_DRIFT:
drift = self.severity * (t - self.fault_time)
corrupted[self.component_index] += drift
return corrupted