Source code for scpn_fusion.control.disruption_risk_runtime

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Disruption Risk Runtime Helpers
"""Risk-scoring and fallback campaign helpers extracted from disruption_predictor."""

from __future__ import annotations

import numpy as np
from typing import Optional


DEFAULT_DISRUPTION_RISK_BIAS = -4.0
DEFAULT_DISRUPTION_RISK_THRESHOLD = 0.50
DISRUPTION_RISK_LINEAR_WEIGHTS: dict[str, float] = {
    "mean": 0.02,
    "std": 0.55,
    "max_val": 0.03,
    "slope": 0.50,
    "energy": 0.005,
    "last": 0.02,
    "n1": 1.10,
    "n2": 0.70,
    "n3": 0.45,
    "asym": 0.50,
    "spread": 0.15,
}


def _require_int(name: str, value: object, minimum: int | None = None) -> int:
    if isinstance(value, bool) or not isinstance(value, (int, np.integer)):
        if minimum is None:
            raise ValueError(f"{name} must be an integer.")
        raise ValueError(f"{name} must be an integer >= {minimum}.")
    parsed = int(value)
    if minimum is not None and parsed < minimum:
        raise ValueError(f"{name} must be an integer >= {minimum}.")
    return parsed


def simulate_tearing_mode(
    steps=1000,
    *,
    rng: Optional[np.random.Generator] = None,
):
    """
    Generates synthetic shot data.
    Returns:
        signal (array): Magnetic sensor data (dB/dt)
        label (int): 1 if disrupted, 0 if safe
        time_to_disruption (array): Time remaining (or -1 if safe)
    """
    steps = _require_int("steps", steps, 1)
    dt = 0.01
    w = 0.01  # Island width
    local_rng = rng if rng is not None else np.random.default_rng()

    is_disruptive = float(local_rng.random()) > 0.5
    trigger_time = int(local_rng.integers(200, 800)) if is_disruptive else 9999

    delta_prime = -0.5
    w_history = []

    beta_p = 0.8  # poloidal beta proxy
    w_crit = 0.05  # island stabilisation threshold

    for t in range(steps):
        if t > trigger_time:
            delta_prime = -0.1
            if w < 0.1:
                w = 0.15  # seed island

        # Modified Rutherford Equation: dw/dt = Delta' + Delta'_BS
        # Delta'_BS = beta_p * w / (w^2 + w_crit^2)
        f_bs = beta_p * (w / (w**2 + w_crit**2))

        dw = (delta_prime + f_bs) * (1.0 - w / 12.0) * dt
        w += dw
        w += float(local_rng.normal(0.0, 0.02))
        w = max(w, 0.001)

        w_history.append(w)

        # Mode lock → disruption
        if w > 8.0:
            return np.array(w_history), 1, (t - trigger_time)

    return np.array(w_history), 0, -1


[docs] def build_disruption_feature_vector(signal, toroidal_observables=None): """ Build a compact feature vector for control-oriented disruption scoring. Feature layout: [mean, std, max, slope, energy, last, toroidal_n1_amp, toroidal_n2_amp, toroidal_n3_amp, toroidal_asymmetry_index, toroidal_radial_spread] """ sig = np.asarray(signal, dtype=float).reshape(-1) if sig.size == 0: raise ValueError("signal must contain at least one sample") if not np.all(np.isfinite(sig)): raise ValueError("signal must be finite.") mean = float(np.mean(sig)) std = float(np.std(sig)) max_val = float(np.max(sig)) slope = float((sig[-1] - sig[0]) / max(sig.size - 1, 1)) energy = float(np.mean(sig**2)) last = float(sig[-1]) obs = toroidal_observables or {} n1 = float(obs.get("toroidal_n1_amp", 0.0)) n2 = float(obs.get("toroidal_n2_amp", 0.0)) n3 = float(obs.get("toroidal_n3_amp", 0.0)) asym = float(obs.get("toroidal_asymmetry_index", np.sqrt(n1 * n1 + n2 * n2 + n3 * n3))) spread = float(obs.get("toroidal_radial_spread", 0.0)) if not np.all(np.isfinite([n1, n2, n3, asym, spread])): raise ValueError("toroidal observables must be finite.") return np.array( [mean, std, max_val, slope, energy, last, n1, n2, n3, asym, spread], dtype=float, )
def _compute_disruption_logit_from_features(features: np.ndarray) -> float: mean, std, max_val, slope, energy, last, n1, n2, n3, asym, spread = features thermal_term = ( DISRUPTION_RISK_LINEAR_WEIGHTS["max_val"] * max_val + DISRUPTION_RISK_LINEAR_WEIGHTS["std"] * std + DISRUPTION_RISK_LINEAR_WEIGHTS["energy"] * energy + DISRUPTION_RISK_LINEAR_WEIGHTS["slope"] * slope ) asym_term = ( DISRUPTION_RISK_LINEAR_WEIGHTS["n1"] * n1 + DISRUPTION_RISK_LINEAR_WEIGHTS["n2"] * n2 + DISRUPTION_RISK_LINEAR_WEIGHTS["n3"] * n3 + DISRUPTION_RISK_LINEAR_WEIGHTS["asym"] * asym + DISRUPTION_RISK_LINEAR_WEIGHTS["spread"] * spread ) state_term = ( DISRUPTION_RISK_LINEAR_WEIGHTS["mean"] * mean + DISRUPTION_RISK_LINEAR_WEIGHTS["last"] * last ) return float(DEFAULT_DISRUPTION_RISK_BIAS + thermal_term + asym_term + state_term)
[docs] def apply_disruption_logit_bias(risk: float, bias_delta: float) -> float: """Apply additive logit-space calibration bias to a bounded risk score.""" risk_f = float(risk) bias_f = float(bias_delta) if not np.isfinite(risk_f): raise ValueError("risk must be finite.") if not np.isfinite(bias_f): raise ValueError("bias_delta must be finite.") clipped = float(np.clip(risk_f, 1e-9, 1.0 - 1e-9)) logit = float(np.log(clipped / (1.0 - clipped)) + bias_f) return float(1.0 / (1.0 + np.exp(-logit)))
[docs] def predict_disruption_risk(signal, toroidal_observables=None, bias_delta: float = 0.0): """ Lightweight deterministic disruption risk estimator (0..1) for control loops. This supplements the Transformer pathway by explicitly consuming toroidal asymmetry observables from 3D diagnostics. """ features = build_disruption_feature_vector(signal, toroidal_observables) bias_f = float(bias_delta) if not np.isfinite(bias_f): raise ValueError("bias_delta must be finite.") logits = _compute_disruption_logit_from_features(features) + bias_f return float(1.0 / (1.0 + np.exp(-logits)))
[docs] def apply_bit_flip_fault(value, bit_index): """Inject a deterministic single-bit fault into a float.""" if isinstance(bit_index, bool) or not isinstance(bit_index, (int, np.integer)): raise ValueError("bit_index must be an integer in [0, 63].") bit = int(bit_index) if bit < 0 or bit > 63: raise ValueError("bit_index must be an integer in [0, 63].") raw = np.array([float(value)], dtype=np.float64).view(np.uint64)[0] flipped = np.uint64(raw ^ (np.uint64(1) << np.uint64(bit))) out = np.array([flipped], dtype=np.uint64).view(np.float64)[0] return float(out if np.isfinite(out) else value)
def _synthetic_control_signal(rng, length): t = np.linspace(0.0, 1.0, int(length), dtype=float) base = 0.7 + 0.15 * np.sin(2.0 * np.pi * 3.0 * t) + 0.05 * np.cos(2.0 * np.pi * 7.0 * t) ramp = np.where(t > 0.65, (t - 0.65) * 0.9, 0.0) noise = rng.normal(0.0, 0.01, size=t.shape) return np.clip(base + ramp + noise, 0.01, None) def _normalize_fault_campaign_inputs( seed, episodes, window, noise_std, bit_flip_interval, recovery_window, recovery_epsilon, ): seed_i = _require_int("seed", seed, 0) episodes_i = _require_int("episodes", episodes, 1) window_i = _require_int("window", window, 16) noise = float(noise_std) bit_flip_i = _require_int("bit_flip_interval", bit_flip_interval, 1) recovery_window_i = _require_int("recovery_window", recovery_window, 1) recovery_eps = float(recovery_epsilon) if not np.isfinite(noise) or noise < 0.0: raise ValueError("noise_std must be finite and >= 0.") if not np.isfinite(recovery_eps) or recovery_eps <= 0.0: raise ValueError("recovery_epsilon must be finite and > 0.") return ( seed_i, episodes_i, window_i, noise, bit_flip_i, recovery_window_i, recovery_eps, )
[docs] def run_fault_noise_campaign( *, seed: int = 0, episodes: int = 64, window: int = 64, noise_std: float = 0.02, bit_flip_interval: int = 7, recovery_window: int = 4, recovery_epsilon: float = 0.05, ) -> dict[str, float | int | bool]: """Run deterministic robustness campaign with injected noise and bit flips.""" ( seed_i, episodes_i, window_i, noise, bit_flip_i, recovery_window_i, recovery_eps, ) = _normalize_fault_campaign_inputs( seed, episodes, window, noise_std, bit_flip_interval, recovery_window, recovery_epsilon, ) rng = np.random.default_rng(seed_i) abs_errors: list[float] = [] recovery_steps: list[int] = [] total_bit_flips = 0 baseline_mean = 0.0 perturbed_mean = 0.0 for _ in range(episodes_i): signal = _synthetic_control_signal(rng, window_i) toroidal = { "toroidal_n1_amp": float(rng.uniform(0.05, 0.25)), "toroidal_n2_amp": float(rng.uniform(0.03, 0.18)), "toroidal_n3_amp": float(rng.uniform(0.01, 0.12)), "toroidal_radial_spread": float(rng.uniform(0.01, 0.08)), } toroidal["toroidal_asymmetry_index"] = float( np.sqrt( toroidal["toroidal_n1_amp"] ** 2 + toroidal["toroidal_n2_amp"] ** 2 + toroidal["toroidal_n3_amp"] ** 2 ) ) baseline = np.array( [predict_disruption_risk(signal[: i + 1], toroidal) for i in range(window_i)], dtype=float, ) baseline_mean += float(np.mean(baseline)) faulty_signal = signal.copy() faulty_indices: list[int] = [] for i in range(window_i): faulty_signal[i] += float(rng.normal(0.0, noise)) if i % bit_flip_i == 0: faulty_signal[i] = apply_bit_flip_fault(faulty_signal[i], int(rng.integers(0, 52))) faulty_indices.append(i) faulty_toroidal = dict(toroidal) faulty_toroidal["toroidal_n1_amp"] = max( 0.0, faulty_toroidal["toroidal_n1_amp"] + float(rng.normal(0.0, noise * 0.5)) ) faulty_toroidal["toroidal_n2_amp"] = max( 0.0, faulty_toroidal["toroidal_n2_amp"] + float(rng.normal(0.0, noise * 0.4)) ) faulty_toroidal["toroidal_asymmetry_index"] = float( np.sqrt( faulty_toroidal["toroidal_n1_amp"] ** 2 + faulty_toroidal["toroidal_n2_amp"] ** 2 + faulty_toroidal["toroidal_n3_amp"] ** 2 ) ) perturbed = np.array( [ predict_disruption_risk(faulty_signal[: i + 1], faulty_toroidal) for i in range(window_i) ], dtype=float, ) perturbed_mean += float(np.mean(perturbed)) err = np.abs(perturbed - baseline) abs_errors.extend(err.tolist()) for idx in faulty_indices: total_bit_flips += 1 stop = min(window_i, idx + recovery_window_i + 1) recover_idx = stop for j in range(idx, stop): if err[j] <= recovery_eps: recover_idx = j break recovery_steps.append(int(recover_idx - idx)) baseline_mean /= float(episodes_i) perturbed_mean /= float(episodes_i) errors_arr = np.asarray(abs_errors, dtype=float) rec_arr = np.asarray( recovery_steps if recovery_steps else [recovery_window_i + 1], dtype=float, ) mean_abs_err = float(np.mean(errors_arr)) p95_abs_err = float(np.percentile(errors_arr, 95)) p95_recovery = float(np.percentile(rec_arr, 95)) success_rate = float(np.mean(rec_arr <= recovery_window_i)) exceed_rate = float(np.mean(errors_arr > recovery_eps)) thresholds: dict[str, float] = { "max_mean_abs_risk_error": 0.08, "max_p95_abs_risk_error": 0.22, "max_recovery_steps_p95": float(recovery_window_i), "min_recovery_success_rate": 0.80, } passes = bool( mean_abs_err <= thresholds["max_mean_abs_risk_error"] and p95_abs_err <= thresholds["max_p95_abs_risk_error"] and p95_recovery <= thresholds["max_recovery_steps_p95"] and success_rate >= thresholds["min_recovery_success_rate"] ) return { "seed": seed_i, "episodes": episodes_i, "window": window_i, "noise_std": noise, "bit_flip_interval": bit_flip_i, "fault_count": int(total_bit_flips), "mean_abs_risk_error": mean_abs_err, "p95_abs_risk_error": p95_abs_err, "recovery_steps_p95": p95_recovery, "recovery_success_rate": success_rate, "thresholds": thresholds, # Backward-compatible aliases retained for existing dashboards/exports. "bit_flip_events": int(total_bit_flips), "exceed_count": int(np.sum(errors_arr > recovery_eps)), "exceed_rate": exceed_rate, "recovery_rate": success_rate, "baseline_mean_risk": float(baseline_mean), "perturbed_mean_risk": float(perturbed_mean), "passes_thresholds": passes, }
[docs] class HybridAnomalyDetector: """Hybrid detector combining learned risk and deterministic smoothing.""" def __init__(self, threshold=0.50, ema=0.05): threshold_f = float(threshold) ema_f = float(ema) if not np.isfinite(threshold_f) or threshold_f < 0.0 or threshold_f > 1.0: raise ValueError("threshold must be finite and in [0, 1].") if not np.isfinite(ema_f) or ema_f <= 0.0 or ema_f > 1.0: raise ValueError("ema must be finite and in (0, 1].") self.threshold = threshold_f self.ema = ema_f self.mean = 0.0 self.var = 1.0 self.initialized = False
[docs] def score(self, signal, toroidal_observables=None) -> dict[str, float | bool]: supervised = float(predict_disruption_risk(signal, toroidal_observables)) if self.initialized: z = abs(supervised - self.mean) / float(np.sqrt(self.var + 1e-9)) unsupervised = float(1.0 - np.exp(-0.5 * z)) else: unsupervised = 0.0 self.initialized = True alpha = self.ema delta = supervised - self.mean self.mean += alpha * delta self.var = (1.0 - alpha) * self.var + alpha * delta * delta self.var = float(max(self.var, 1e-9)) anomaly_score = float(np.clip(0.7 * supervised + 0.3 * unsupervised, 0.0, 1.0)) return { "supervised_score": supervised, "unsupervised_score": float(unsupervised), "anomaly_score": anomaly_score, "alarm": bool(anomaly_score >= self.threshold), }
[docs] def run_anomaly_alarm_campaign( *, seed: int = 0, episodes: int = 128, window: int = 64, threshold: float = DEFAULT_DISRUPTION_RISK_THRESHOLD, ) -> dict[str, float | int | bool]: """Evaluate alarm true/false-positive behavior on deterministic synthetic episodes.""" seed_i = _require_int("seed", seed, 0) episodes_i = _require_int("episodes", episodes, 1) window_i = _require_int("window", window, 16) threshold_f = float(threshold) if not np.isfinite(threshold_f) or threshold_f <= 0.0 or threshold_f >= 1.0: raise ValueError("threshold must be finite and in (0, 1).") rng = np.random.default_rng(seed_i) detector = HybridAnomalyDetector(threshold=threshold_f, ema=0.12) positives = 0 negatives = 0 tp = 0 fp = 0 alarm_latency_steps: list[int] = [] for _ in range(episodes_i): signal = _synthetic_control_signal(rng, window_i) is_disruptive = bool(float(rng.random()) < 0.3) if is_disruptive: perturb = np.linspace(0.0, 0.35, window_i) signal = signal + perturb positives += 1 else: negatives += 1 alarm_step = -1 for idx in range(8, window_i + 1): scored = detector.score(signal[:idx]) if bool(scored["alarm"]): alarm_step = idx break alarmed = alarm_step >= 0 if is_disruptive and alarmed: tp += 1 alarm_latency_steps.append(max(alarm_step - 8, 0)) elif (not is_disruptive) and alarmed: fp += 1 tpr = float(tp / positives) if positives > 0 else 0.0 fpr = float(fp / negatives) if negatives > 0 else 0.0 p95_latency = int(np.percentile(alarm_latency_steps, 95)) if alarm_latency_steps else -1 passes = bool(tpr >= 0.90 and fpr <= 0.10) return { "seed": seed_i, "episodes": episodes_i, "window": window_i, "threshold": threshold_f, "true_positive_rate": tpr, "false_positive_rate": fpr, "p95_alarm_latency_steps": p95_latency, "passes_thresholds": passes, }