# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Disruption Risk Runtime Helpers
"""Risk-scoring and fallback campaign helpers extracted from disruption_predictor."""
from __future__ import annotations
import numpy as np
from typing import Optional
DEFAULT_DISRUPTION_RISK_BIAS = -4.0
DEFAULT_DISRUPTION_RISK_THRESHOLD = 0.50
DISRUPTION_RISK_LINEAR_WEIGHTS: dict[str, float] = {
"mean": 0.02,
"std": 0.55,
"max_val": 0.03,
"slope": 0.50,
"energy": 0.005,
"last": 0.02,
"n1": 1.10,
"n2": 0.70,
"n3": 0.45,
"asym": 0.50,
"spread": 0.15,
}
def _require_int(name: str, value: object, minimum: int | None = None) -> int:
if isinstance(value, bool) or not isinstance(value, (int, np.integer)):
if minimum is None:
raise ValueError(f"{name} must be an integer.")
raise ValueError(f"{name} must be an integer >= {minimum}.")
parsed = int(value)
if minimum is not None and parsed < minimum:
raise ValueError(f"{name} must be an integer >= {minimum}.")
return parsed
def simulate_tearing_mode(
steps=1000,
*,
rng: Optional[np.random.Generator] = None,
):
"""
Generates synthetic shot data.
Returns:
signal (array): Magnetic sensor data (dB/dt)
label (int): 1 if disrupted, 0 if safe
time_to_disruption (array): Time remaining (or -1 if safe)
"""
steps = _require_int("steps", steps, 1)
dt = 0.01
w = 0.01 # Island width
local_rng = rng if rng is not None else np.random.default_rng()
is_disruptive = float(local_rng.random()) > 0.5
trigger_time = int(local_rng.integers(200, 800)) if is_disruptive else 9999
delta_prime = -0.5
w_history = []
beta_p = 0.8 # poloidal beta proxy
w_crit = 0.05 # island stabilisation threshold
for t in range(steps):
if t > trigger_time:
delta_prime = -0.1
if w < 0.1:
w = 0.15 # seed island
# Modified Rutherford Equation: dw/dt = Delta' + Delta'_BS
# Delta'_BS = beta_p * w / (w^2 + w_crit^2)
f_bs = beta_p * (w / (w**2 + w_crit**2))
dw = (delta_prime + f_bs) * (1.0 - w / 12.0) * dt
w += dw
w += float(local_rng.normal(0.0, 0.02))
w = max(w, 0.001)
w_history.append(w)
# Mode lock → disruption
if w > 8.0:
return np.array(w_history), 1, (t - trigger_time)
return np.array(w_history), 0, -1
[docs]
def build_disruption_feature_vector(signal, toroidal_observables=None):
"""
Build a compact feature vector for control-oriented disruption scoring.
Feature layout:
[mean, std, max, slope, energy, last,
toroidal_n1_amp, toroidal_n2_amp, toroidal_n3_amp,
toroidal_asymmetry_index, toroidal_radial_spread]
"""
sig = np.asarray(signal, dtype=float).reshape(-1)
if sig.size == 0:
raise ValueError("signal must contain at least one sample")
if not np.all(np.isfinite(sig)):
raise ValueError("signal must be finite.")
mean = float(np.mean(sig))
std = float(np.std(sig))
max_val = float(np.max(sig))
slope = float((sig[-1] - sig[0]) / max(sig.size - 1, 1))
energy = float(np.mean(sig**2))
last = float(sig[-1])
obs = toroidal_observables or {}
n1 = float(obs.get("toroidal_n1_amp", 0.0))
n2 = float(obs.get("toroidal_n2_amp", 0.0))
n3 = float(obs.get("toroidal_n3_amp", 0.0))
asym = float(obs.get("toroidal_asymmetry_index", np.sqrt(n1 * n1 + n2 * n2 + n3 * n3)))
spread = float(obs.get("toroidal_radial_spread", 0.0))
if not np.all(np.isfinite([n1, n2, n3, asym, spread])):
raise ValueError("toroidal observables must be finite.")
return np.array(
[mean, std, max_val, slope, energy, last, n1, n2, n3, asym, spread],
dtype=float,
)
def _compute_disruption_logit_from_features(features: np.ndarray) -> float:
mean, std, max_val, slope, energy, last, n1, n2, n3, asym, spread = features
thermal_term = (
DISRUPTION_RISK_LINEAR_WEIGHTS["max_val"] * max_val
+ DISRUPTION_RISK_LINEAR_WEIGHTS["std"] * std
+ DISRUPTION_RISK_LINEAR_WEIGHTS["energy"] * energy
+ DISRUPTION_RISK_LINEAR_WEIGHTS["slope"] * slope
)
asym_term = (
DISRUPTION_RISK_LINEAR_WEIGHTS["n1"] * n1
+ DISRUPTION_RISK_LINEAR_WEIGHTS["n2"] * n2
+ DISRUPTION_RISK_LINEAR_WEIGHTS["n3"] * n3
+ DISRUPTION_RISK_LINEAR_WEIGHTS["asym"] * asym
+ DISRUPTION_RISK_LINEAR_WEIGHTS["spread"] * spread
)
state_term = (
DISRUPTION_RISK_LINEAR_WEIGHTS["mean"] * mean
+ DISRUPTION_RISK_LINEAR_WEIGHTS["last"] * last
)
return float(DEFAULT_DISRUPTION_RISK_BIAS + thermal_term + asym_term + state_term)
[docs]
def apply_disruption_logit_bias(risk: float, bias_delta: float) -> float:
"""Apply additive logit-space calibration bias to a bounded risk score."""
risk_f = float(risk)
bias_f = float(bias_delta)
if not np.isfinite(risk_f):
raise ValueError("risk must be finite.")
if not np.isfinite(bias_f):
raise ValueError("bias_delta must be finite.")
clipped = float(np.clip(risk_f, 1e-9, 1.0 - 1e-9))
logit = float(np.log(clipped / (1.0 - clipped)) + bias_f)
return float(1.0 / (1.0 + np.exp(-logit)))
[docs]
def predict_disruption_risk(signal, toroidal_observables=None, bias_delta: float = 0.0):
"""
Lightweight deterministic disruption risk estimator (0..1) for control loops.
This supplements the Transformer pathway by explicitly consuming toroidal
asymmetry observables from 3D diagnostics.
"""
features = build_disruption_feature_vector(signal, toroidal_observables)
bias_f = float(bias_delta)
if not np.isfinite(bias_f):
raise ValueError("bias_delta must be finite.")
logits = _compute_disruption_logit_from_features(features) + bias_f
return float(1.0 / (1.0 + np.exp(-logits)))
[docs]
def apply_bit_flip_fault(value, bit_index):
"""Inject a deterministic single-bit fault into a float."""
if isinstance(bit_index, bool) or not isinstance(bit_index, (int, np.integer)):
raise ValueError("bit_index must be an integer in [0, 63].")
bit = int(bit_index)
if bit < 0 or bit > 63:
raise ValueError("bit_index must be an integer in [0, 63].")
raw = np.array([float(value)], dtype=np.float64).view(np.uint64)[0]
flipped = np.uint64(raw ^ (np.uint64(1) << np.uint64(bit)))
out = np.array([flipped], dtype=np.uint64).view(np.float64)[0]
return float(out if np.isfinite(out) else value)
def _synthetic_control_signal(rng, length):
t = np.linspace(0.0, 1.0, int(length), dtype=float)
base = 0.7 + 0.15 * np.sin(2.0 * np.pi * 3.0 * t) + 0.05 * np.cos(2.0 * np.pi * 7.0 * t)
ramp = np.where(t > 0.65, (t - 0.65) * 0.9, 0.0)
noise = rng.normal(0.0, 0.01, size=t.shape)
return np.clip(base + ramp + noise, 0.01, None)
def _normalize_fault_campaign_inputs(
seed,
episodes,
window,
noise_std,
bit_flip_interval,
recovery_window,
recovery_epsilon,
):
seed_i = _require_int("seed", seed, 0)
episodes_i = _require_int("episodes", episodes, 1)
window_i = _require_int("window", window, 16)
noise = float(noise_std)
bit_flip_i = _require_int("bit_flip_interval", bit_flip_interval, 1)
recovery_window_i = _require_int("recovery_window", recovery_window, 1)
recovery_eps = float(recovery_epsilon)
if not np.isfinite(noise) or noise < 0.0:
raise ValueError("noise_std must be finite and >= 0.")
if not np.isfinite(recovery_eps) or recovery_eps <= 0.0:
raise ValueError("recovery_epsilon must be finite and > 0.")
return (
seed_i,
episodes_i,
window_i,
noise,
bit_flip_i,
recovery_window_i,
recovery_eps,
)
[docs]
def run_fault_noise_campaign(
*,
seed: int = 0,
episodes: int = 64,
window: int = 64,
noise_std: float = 0.02,
bit_flip_interval: int = 7,
recovery_window: int = 4,
recovery_epsilon: float = 0.05,
) -> dict[str, float | int | bool]:
"""Run deterministic robustness campaign with injected noise and bit flips."""
(
seed_i,
episodes_i,
window_i,
noise,
bit_flip_i,
recovery_window_i,
recovery_eps,
) = _normalize_fault_campaign_inputs(
seed,
episodes,
window,
noise_std,
bit_flip_interval,
recovery_window,
recovery_epsilon,
)
rng = np.random.default_rng(seed_i)
abs_errors: list[float] = []
recovery_steps: list[int] = []
total_bit_flips = 0
baseline_mean = 0.0
perturbed_mean = 0.0
for _ in range(episodes_i):
signal = _synthetic_control_signal(rng, window_i)
toroidal = {
"toroidal_n1_amp": float(rng.uniform(0.05, 0.25)),
"toroidal_n2_amp": float(rng.uniform(0.03, 0.18)),
"toroidal_n3_amp": float(rng.uniform(0.01, 0.12)),
"toroidal_radial_spread": float(rng.uniform(0.01, 0.08)),
}
toroidal["toroidal_asymmetry_index"] = float(
np.sqrt(
toroidal["toroidal_n1_amp"] ** 2
+ toroidal["toroidal_n2_amp"] ** 2
+ toroidal["toroidal_n3_amp"] ** 2
)
)
baseline = np.array(
[predict_disruption_risk(signal[: i + 1], toroidal) for i in range(window_i)],
dtype=float,
)
baseline_mean += float(np.mean(baseline))
faulty_signal = signal.copy()
faulty_indices: list[int] = []
for i in range(window_i):
faulty_signal[i] += float(rng.normal(0.0, noise))
if i % bit_flip_i == 0:
faulty_signal[i] = apply_bit_flip_fault(faulty_signal[i], int(rng.integers(0, 52)))
faulty_indices.append(i)
faulty_toroidal = dict(toroidal)
faulty_toroidal["toroidal_n1_amp"] = max(
0.0, faulty_toroidal["toroidal_n1_amp"] + float(rng.normal(0.0, noise * 0.5))
)
faulty_toroidal["toroidal_n2_amp"] = max(
0.0, faulty_toroidal["toroidal_n2_amp"] + float(rng.normal(0.0, noise * 0.4))
)
faulty_toroidal["toroidal_asymmetry_index"] = float(
np.sqrt(
faulty_toroidal["toroidal_n1_amp"] ** 2
+ faulty_toroidal["toroidal_n2_amp"] ** 2
+ faulty_toroidal["toroidal_n3_amp"] ** 2
)
)
perturbed = np.array(
[
predict_disruption_risk(faulty_signal[: i + 1], faulty_toroidal)
for i in range(window_i)
],
dtype=float,
)
perturbed_mean += float(np.mean(perturbed))
err = np.abs(perturbed - baseline)
abs_errors.extend(err.tolist())
for idx in faulty_indices:
total_bit_flips += 1
stop = min(window_i, idx + recovery_window_i + 1)
recover_idx = stop
for j in range(idx, stop):
if err[j] <= recovery_eps:
recover_idx = j
break
recovery_steps.append(int(recover_idx - idx))
baseline_mean /= float(episodes_i)
perturbed_mean /= float(episodes_i)
errors_arr = np.asarray(abs_errors, dtype=float)
rec_arr = np.asarray(
recovery_steps if recovery_steps else [recovery_window_i + 1],
dtype=float,
)
mean_abs_err = float(np.mean(errors_arr))
p95_abs_err = float(np.percentile(errors_arr, 95))
p95_recovery = float(np.percentile(rec_arr, 95))
success_rate = float(np.mean(rec_arr <= recovery_window_i))
exceed_rate = float(np.mean(errors_arr > recovery_eps))
thresholds: dict[str, float] = {
"max_mean_abs_risk_error": 0.08,
"max_p95_abs_risk_error": 0.22,
"max_recovery_steps_p95": float(recovery_window_i),
"min_recovery_success_rate": 0.80,
}
passes = bool(
mean_abs_err <= thresholds["max_mean_abs_risk_error"]
and p95_abs_err <= thresholds["max_p95_abs_risk_error"]
and p95_recovery <= thresholds["max_recovery_steps_p95"]
and success_rate >= thresholds["min_recovery_success_rate"]
)
return {
"seed": seed_i,
"episodes": episodes_i,
"window": window_i,
"noise_std": noise,
"bit_flip_interval": bit_flip_i,
"fault_count": int(total_bit_flips),
"mean_abs_risk_error": mean_abs_err,
"p95_abs_risk_error": p95_abs_err,
"recovery_steps_p95": p95_recovery,
"recovery_success_rate": success_rate,
"thresholds": thresholds,
# Backward-compatible aliases retained for existing dashboards/exports.
"bit_flip_events": int(total_bit_flips),
"exceed_count": int(np.sum(errors_arr > recovery_eps)),
"exceed_rate": exceed_rate,
"recovery_rate": success_rate,
"baseline_mean_risk": float(baseline_mean),
"perturbed_mean_risk": float(perturbed_mean),
"passes_thresholds": passes,
}
[docs]
class HybridAnomalyDetector:
"""Hybrid detector combining learned risk and deterministic smoothing."""
def __init__(self, threshold=0.50, ema=0.05):
threshold_f = float(threshold)
ema_f = float(ema)
if not np.isfinite(threshold_f) or threshold_f < 0.0 or threshold_f > 1.0:
raise ValueError("threshold must be finite and in [0, 1].")
if not np.isfinite(ema_f) or ema_f <= 0.0 or ema_f > 1.0:
raise ValueError("ema must be finite and in (0, 1].")
self.threshold = threshold_f
self.ema = ema_f
self.mean = 0.0
self.var = 1.0
self.initialized = False
[docs]
def score(self, signal, toroidal_observables=None) -> dict[str, float | bool]:
supervised = float(predict_disruption_risk(signal, toroidal_observables))
if self.initialized:
z = abs(supervised - self.mean) / float(np.sqrt(self.var + 1e-9))
unsupervised = float(1.0 - np.exp(-0.5 * z))
else:
unsupervised = 0.0
self.initialized = True
alpha = self.ema
delta = supervised - self.mean
self.mean += alpha * delta
self.var = (1.0 - alpha) * self.var + alpha * delta * delta
self.var = float(max(self.var, 1e-9))
anomaly_score = float(np.clip(0.7 * supervised + 0.3 * unsupervised, 0.0, 1.0))
return {
"supervised_score": supervised,
"unsupervised_score": float(unsupervised),
"anomaly_score": anomaly_score,
"alarm": bool(anomaly_score >= self.threshold),
}
[docs]
def run_anomaly_alarm_campaign(
*,
seed: int = 0,
episodes: int = 128,
window: int = 64,
threshold: float = DEFAULT_DISRUPTION_RISK_THRESHOLD,
) -> dict[str, float | int | bool]:
"""Evaluate alarm true/false-positive behavior on deterministic synthetic episodes."""
seed_i = _require_int("seed", seed, 0)
episodes_i = _require_int("episodes", episodes, 1)
window_i = _require_int("window", window, 16)
threshold_f = float(threshold)
if not np.isfinite(threshold_f) or threshold_f <= 0.0 or threshold_f >= 1.0:
raise ValueError("threshold must be finite and in (0, 1).")
rng = np.random.default_rng(seed_i)
detector = HybridAnomalyDetector(threshold=threshold_f, ema=0.12)
positives = 0
negatives = 0
tp = 0
fp = 0
alarm_latency_steps: list[int] = []
for _ in range(episodes_i):
signal = _synthetic_control_signal(rng, window_i)
is_disruptive = bool(float(rng.random()) < 0.3)
if is_disruptive:
perturb = np.linspace(0.0, 0.35, window_i)
signal = signal + perturb
positives += 1
else:
negatives += 1
alarm_step = -1
for idx in range(8, window_i + 1):
scored = detector.score(signal[:idx])
if bool(scored["alarm"]):
alarm_step = idx
break
alarmed = alarm_step >= 0
if is_disruptive and alarmed:
tp += 1
alarm_latency_steps.append(max(alarm_step - 8, 0))
elif (not is_disruptive) and alarmed:
fp += 1
tpr = float(tp / positives) if positives > 0 else 0.0
fpr = float(fp / negatives) if negatives > 0 else 0.0
p95_latency = int(np.percentile(alarm_latency_steps, 95)) if alarm_latency_steps else -1
passes = bool(tpr >= 0.90 and fpr <= 0.10)
return {
"seed": seed_i,
"episodes": episodes_i,
"window": window_i,
"threshold": threshold_f,
"true_positive_rate": tpr,
"false_positive_rate": fpr,
"p95_alarm_latency_steps": p95_latency,
"passes_thresholds": passes,
}