# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Fusion Control Room
"""Deterministic control-room simulation with diagnostics, estimation, and rendering."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any, Callable, Optional
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation, PillowWriter
from matplotlib.patches import Rectangle
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
FloatArray = NDArray[np.float64]
FusionKernel: type[Any] | None
try:
from scpn_fusion.core._rust_compat import FusionKernel
except ImportError:
try:
from scpn_fusion.core.fusion_kernel import FusionKernel
except (ImportError, OSError): # pragma: no cover - optional kernel path
FusionKernel = None
RESOLUTION = 60
SIM_DURATION = 200
FPS = 10
_RENDER_OUTPUT_EXCEPTIONS = (OSError, RuntimeError, ValueError, TypeError)
_KERNEL_INIT_EXCEPTIONS = (OSError, RuntimeError, ValueError, TypeError, AttributeError)
_KERNEL_COIL_UPDATE_EXCEPTIONS = (
AttributeError,
KeyError,
TypeError,
ValueError,
IndexError,
)
_KERNEL_SOLVE_EXCEPTIONS = (RuntimeError, ValueError, TypeError, FloatingPointError)
[docs]
class TokamakPhysicsEngine:
"""Reduced Grad-Shafranov geometry model with optional kernel-Psi ingestion."""
[docs]
def __init__(
self,
size: int = RESOLUTION,
*,
seed: int = 42,
kernel: Optional[Any] = None,
) -> None:
"""Initialise the geometry grid, RNG, and optional kernel handle."""
size = int(size)
if size < 16:
raise ValueError("size must be >= 16.")
self.size = size
self.rng = np.random.default_rng(int(seed))
self.kernel = kernel
self.R = np.linspace(1.0, 5.0, self.size)
self.Z = np.linspace(-3.0, 3.0, self.size)
self.RR, self.ZZ = np.meshgrid(self.R, self.Z)
self.density: FloatArray = np.zeros((self.size, self.size), dtype=np.float64)
# Plasma Parameters (ITER-like)
self.R0 = 3.0
self.a = 1.0
self.kappa = 1.7
self.delta = 0.33
# State
self.z_pos = 0.0
self.v_drift = 0.0
def _kernel_psi(self) -> Optional[FloatArray]:
if self.kernel is None or not hasattr(self.kernel, "Psi"):
return None
psi = np.asarray(self.kernel.Psi, dtype=np.float64)
if psi.ndim != 2 or psi.shape[0] < 8 or psi.shape[1] < 8:
return None
return psi
[docs]
def solve_flux_surfaces(self) -> tuple[FloatArray, FloatArray]:
"""Return ``(density, psi)`` from kernel state or analytic geometry.
Uses the kernel ``Psi`` field when available, otherwise an analytic
Miller-parameterized geometry.
"""
psi_kernel = self._kernel_psi()
if psi_kernel is not None:
psi_min = float(np.min(psi_kernel))
psi_max = float(np.max(psi_kernel))
psi_span = max(psi_max - psi_min, 1.0e-9)
psi = (psi_kernel - psi_min) / psi_span
else:
rho_sq = (
(self.RR - self.R0) ** 2
+ ((self.ZZ - self.z_pos) / self.kappa) ** 2
- 2.0 * self.delta * (self.RR - self.R0) * ((self.RR - self.R0) ** 2)
)
psi = rho_sq / (self.a**2)
psi = np.clip(psi, 0.0, None)
core_term = np.maximum(1.0 - psi, 0.0)
self.density = np.asarray(np.where(psi < 1.0, core_term**1.5, 0.0), dtype=np.float64)
noise = self.rng.normal(0.0, 0.05, size=self.density.shape) * self.density
self.density = np.asarray(np.clip(self.density + noise, 0.0, None), dtype=np.float64)
return self.density, np.asarray(psi, dtype=np.float64)
[docs]
def step_dynamics(self, coil_action_top: float, coil_action_bottom: float) -> float:
"""Advance the reduced vertical-displacement dynamics by one step."""
instability_growth = 0.1
control_force = (float(coil_action_bottom) - float(coil_action_top)) * 0.2
disturbance = float(self.rng.normal(0.0, 0.01))
if float(self.rng.random()) < 0.05:
disturbance += 0.2
accel = (instability_growth * self.z_pos) + control_force + disturbance
self.v_drift += accel
self.z_pos += self.v_drift
self.v_drift *= 0.9
return float(self.z_pos)
[docs]
class DiagnosticSystem:
"""Noisy vertical-position probe."""
[docs]
def __init__(self, rng: np.random.Generator) -> None:
"""Store the random generator used to inject measurement noise."""
self._rng = rng
[docs]
def measure_position(self, true_z: float) -> float:
"""Return a noisy vertical-position measurement from the supplied true state."""
return float(true_z + self._rng.normal(0.0, 0.05))
[docs]
class KalmanObserver:
"""Linear Kalman filter for plasma vertical-position estimation.
Hardens the state estimate against sensor noise and measurement dropout.
"""
[docs]
def __init__(self, dt: float = 0.1):
"""Initialise the state vector, covariances, and linear drift model."""
# State: [z, v_z]
self.x = np.array([0.0, 0.0])
self.P = np.eye(2) * 0.1
# Transition Matrix (Linear drift model)
self.A = np.array([[1.0, dt], [0.0, 0.9]]) # 0.9 matches drift damping in engine
# Measurement Matrix (We only measure position z)
self.H = np.array([[1.0, 0.0]])
# Covariance Matrices
self.Q = np.eye(2) * 0.01 # Process Noise
self.R = np.array([[0.05]]) # Measurement Noise
[docs]
def update(self, measured_z: float, dropout: bool = False) -> float:
"""Predict-correct cycle. Returns filtered Z-position."""
self.x = self.A @ self.x
self.P = self.A @ self.P @ self.A.T + self.Q
if not dropout and np.isfinite(measured_z):
y = measured_z - (self.H @ self.x) # innovation
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(2) - K @ self.H) @ self.P
else:
self.P *= 1.2 # covariance inflation on dropout
return float(self.x[0])
[docs]
class NeuralController:
"""Hardened PID control policy for vertical stabilization."""
[docs]
def __init__(self, dt: float = 0.1) -> None:
"""Initialise PID gains, derivative filter, and anti-windup limit."""
self.dt = dt
self.integral_error = 0.0
self.prev_error = 0.0
self.filtered_derivative = 0.0
# PID Gains (Hardened defaults)
self.kp = 5.0
self.ki = 0.5
self.kd = 2.0
# Filter constant for derivative (tau_d)
# Prevents noise spikes from being amplified
self.tau_d = 0.05
# Anti-windup limit
self.integral_limit = 2.0
[docs]
def compute_action(self, measured_z: float) -> tuple[float, float]:
"""Map filtered vertical displacement to bounded top and bottom coil actions."""
error = -float(measured_z)
p_term = self.kp * error
self.integral_error += error * self.dt
self.integral_error = np.clip(
self.integral_error, -self.integral_limit, self.integral_limit
)
i_term = self.ki * self.integral_error
# d/dt low-pass: y(t) = α·x(t) + (1-α)·y(t-dt)
raw_derivative = (error - self.prev_error) / self.dt
alpha = self.dt / (self.tau_d + self.dt)
self.filtered_derivative = alpha * raw_derivative + (1.0 - alpha) * self.filtered_derivative
d_term = self.kd * self.filtered_derivative
self.prev_error = error
pid_out = p_term + i_term + d_term
# Actuator Saturation (tanh)
force = float(np.tanh(pid_out))
if force > 0.0:
return 0.0, abs(force)
return abs(force), 0.0
def _render_outputs(
frames: list[dict[str, Any]],
history_z: list[float],
history_top: list[float],
history_bot: list[float],
*,
save_animation: bool,
save_report: bool,
output_gif: str,
output_report: str,
) -> tuple[bool, Optional[str], bool, Optional[str]]:
fig = plt.figure(figsize=(12, 8), facecolor="#1e1e1e")
gs = fig.add_gridspec(2, 2)
ax_plasma = fig.add_subplot(gs[:, 0])
ax_plasma.set_facecolor("black")
ax_plasma.set_title("Tokamak Cross-Section (Live)", color="white")
extent: tuple[float, float, float, float] = (
float(frames[0]["r_min"]),
float(frames[0]["r_max"]),
float(frames[0]["z_min"]),
float(frames[0]["z_max"]),
)
im = ax_plasma.imshow(
np.asarray(frames[0]["density"]),
extent=extent,
origin="lower",
cmap="plasma",
vmin=0.0,
vmax=1.0,
animated=True,
)
ax_trace = fig.add_subplot(gs[0, 1])
ax_trace.set_facecolor("#2e2e2e")
ax_trace.set_title("Vertical Displacement (Z-Pos)", color="white")
ax_trace.set_ylim(-1.5, 1.5)
ax_trace.grid(True, color="#444")
(line_z,) = ax_trace.plot([], [], "cyan", lw=2, animated=True)
(line_setpoint,) = ax_trace.plot([], [], "r--", alpha=0.5, animated=True)
ax_trace.set_xlim(0, max(50, len(frames)))
ax_coils = fig.add_subplot(gs[1, 1])
ax_coils.set_facecolor("#2e2e2e")
ax_coils.set_title("Poloidal Field Coil Currents", color="white")
ax_coils.set_ylim(0, 1.1)
bar_top = ax_coils.bar([0], [history_top[0]], color="red", label="Top Coil")
bar_bot = ax_coils.bar([1], [history_bot[0]], color="blue", label="Bottom Coil")
ax_coils.set_xticks([0, 1])
ax_coils.set_xticklabels(["Top", "Bottom"], color="white")
ax_coils.legend()
(top_marker,) = ax_plasma.plot(3.0, 2.9, "s", color="red", markersize=20, alpha=0.3)
(bot_marker,) = ax_plasma.plot(3.0, -2.9, "s", color="blue", markersize=20, alpha=0.3)
wall = Rectangle(
(extent[0], extent[2] + 0.2),
extent[1] - extent[0],
(extent[3] - extent[2]) - 0.4,
linewidth=2,
edgecolor="gray",
facecolor="none",
)
ax_plasma.add_patch(wall)
def update(frame_idx: int) -> tuple[Any, ...]:
rec = frames[frame_idx]
im.set_data(np.asarray(rec["density"]))
line_z.set_data(range(frame_idx + 1), history_z[: frame_idx + 1])
line_setpoint.set_data(range(frame_idx + 1), [0.0] * (frame_idx + 1))
top_val = float(history_top[frame_idx])
bot_val = float(history_bot[frame_idx])
bar_top[0].set_height(top_val)
bar_bot[0].set_height(bot_val)
top_marker.set_alpha(0.3 + (0.7 * top_val))
bot_marker.set_alpha(0.3 + (0.7 * bot_val))
ax_plasma.set_title(f"Plasma Shape (t={frame_idx})", color="white")
return (
im,
line_z,
line_setpoint,
bar_top[0],
bar_bot[0],
top_marker,
bot_marker,
)
animation_saved = False
animation_error: Optional[str] = None
if save_animation:
try:
ani = FuncAnimation(
fig,
update,
frames=len(frames),
interval=100,
blit=True,
)
ani.save(output_gif, writer=PillowWriter(fps=FPS))
animation_saved = True
except _RENDER_OUTPUT_EXCEPTIONS as exc:
animation_error = str(exc)
report_saved = False
report_error: Optional[str] = None
if save_report:
try:
update(len(frames) - 1)
plt.tight_layout()
fig.savefig(output_report)
report_saved = True
except _RENDER_OUTPUT_EXCEPTIONS as exc:
report_error = str(exc)
plt.close(fig)
return animation_saved, animation_error, report_saved, report_error
[docs]
def run_control_room(
sim_duration: int = SIM_DURATION,
*,
seed: int = 42,
save_animation: bool = True,
save_report: bool = True,
output_gif: str = "SCPN_Fusion_Control_Room.gif",
output_report: str = "SCPN_Fusion_Status_Report.png",
verbose: bool = True,
kernel_factory: Optional[Callable[[str], Any]] = None,
config_file: Optional[str] = None,
allow_kernel_fallback: bool = True,
) -> dict[str, Any]:
"""Run the control-room loop and return deterministic summary metrics."""
steps = int(sim_duration)
if steps < 1:
raise ValueError("sim_duration must be >= 1.")
allow_kernel_fallback = bool(allow_kernel_fallback)
rng = np.random.default_rng(int(seed))
kernel = None
kernel_error = None
psi_source = "analytic"
if kernel_factory is not None or (config_file is not None and FusionKernel is not None):
if kernel_factory is None and FusionKernel is not None:
kernel_factory = FusionKernel
cfg = (
str(config_file)
if config_file is not None
else str(Path(__file__).resolve().parents[3] / "iter_config.json")
)
try:
kernel = kernel_factory(cfg) if kernel_factory is not None else None
if kernel is not None:
psi_source = "kernel"
except _KERNEL_INIT_EXCEPTIONS as exc:
if not allow_kernel_fallback:
raise RuntimeError(
"Kernel initialization failed and allow_kernel_fallback=False."
) from exc
kernel = None
kernel_error = str(exc)
psi_source = "analytic"
if verbose:
logger.info("--- SCPN FUSION CONTROL ROOM: Grad-Shafranov VDE Simulation ---")
if psi_source == "kernel":
logger.info("Using kernel-backed Psi source.")
elif kernel_error:
logger.warning(
"Kernel init failed; continuing with analytic Psi source: %s",
kernel_error,
)
reactor = TokamakPhysicsEngine(seed=seed, kernel=kernel)
sensors = DiagnosticSystem(rng=rng)
observer = KalmanObserver()
ai = NeuralController()
history_z: list[float] = []
history_z_measured: list[float] = []
history_top: list[float] = []
history_bot: list[float] = []
frames: list[dict[str, Any]] = []
kernel_coil_update_failures = 0
kernel_coil_update_error: Optional[str] = None
kernel_solve_failures = 0
kernel_solve_error: Optional[str] = None
top_action = 0.0
bot_action = 0.0
for frame in range(steps):
if kernel is not None and hasattr(kernel, "cfg"):
try:
coils = kernel.cfg.get("coils", [])
if len(coils) >= 5:
coils[0]["current"] = float(coils[0].get("current", 0.0)) + top_action
coils[4]["current"] = float(coils[4].get("current", 0.0)) + bot_action
except _KERNEL_COIL_UPDATE_EXCEPTIONS as exc:
kernel_coil_update_failures += 1
if kernel_coil_update_error is None:
kernel_coil_update_error = str(exc)
if not allow_kernel_fallback:
raise RuntimeError(
"Kernel coil update failed and allow_kernel_fallback=False."
) from exc
logger.warning(
"Kernel coil update failed; continuing with analytic control actions: %s",
kernel_coil_update_error,
)
if kernel is not None and hasattr(kernel, "solve_equilibrium"):
try:
kernel.solve_equilibrium()
except _KERNEL_SOLVE_EXCEPTIONS as exc:
kernel_solve_failures += 1
if kernel_solve_error is None:
kernel_solve_error = str(exc)
if not allow_kernel_fallback:
raise RuntimeError(
"Kernel solve failed and allow_kernel_fallback=False."
) from exc
logger.warning(
"Kernel equilibrium solve failed; continuing with last known state: %s",
kernel_solve_error,
)
true_z = reactor.step_dynamics(top_action, bot_action)
density, psi = reactor.solve_flux_surfaces()
# Sensor measurement with potential dropout
raw_measured_z = sensors.measure_position(true_z)
dropout = frame % 20 == 0 # Simulate periodic glitch
# State estimation
filtered_z = observer.update(raw_measured_z, dropout=dropout)
# AI uses filtered state
top_action, bot_action = ai.compute_action(filtered_z)
history_z.append(float(true_z))
history_z_measured.append(float(raw_measured_z))
history_top.append(float(top_action))
history_bot.append(float(bot_action))
frames.append(
{
"density": np.asarray(density, dtype=np.float64).copy(),
"psi": np.asarray(psi, dtype=np.float64).copy(),
"r_min": float(np.min(reactor.R)),
"r_max": float(np.max(reactor.R)),
"z_min": float(np.min(reactor.Z)),
"z_max": float(np.max(reactor.Z)),
}
)
animation_saved = False
animation_error: Optional[str] = None
report_saved = False
report_error: Optional[str] = None
if save_animation or save_report:
(
animation_saved,
animation_error,
report_saved,
report_error,
) = _render_outputs(
frames,
history_z,
history_top,
history_bot,
save_animation=save_animation,
save_report=save_report,
output_gif=output_gif,
output_report=output_report,
)
z_arr = np.asarray(history_z, dtype=np.float64)
top_arr = np.asarray(history_top, dtype=np.float64)
bot_arr = np.asarray(history_bot, dtype=np.float64)
return {
"seed": int(seed),
"steps": int(steps),
"psi_source": psi_source,
"kernel_fallback_allowed": bool(allow_kernel_fallback),
"kernel_fallback_used": bool(
kernel_error is not None or kernel_coil_update_failures > 0 or kernel_solve_failures > 0
),
"kernel_error": kernel_error,
"final_z": float(z_arr[-1]),
"mean_abs_z": float(np.mean(np.abs(z_arr))),
"max_abs_z": float(np.max(np.abs(z_arr))),
"mean_top_action": float(np.mean(top_arr)),
"mean_bottom_action": float(np.mean(bot_arr)),
"kernel_coil_update_failures": int(kernel_coil_update_failures),
"kernel_coil_update_error": kernel_coil_update_error,
"kernel_solve_failures": int(kernel_solve_failures),
"kernel_solve_error": kernel_solve_error,
"animation_saved": bool(animation_saved),
"animation_error": animation_error,
"report_saved": bool(report_saved),
"report_error": report_error,
}
if __name__ == "__main__":
summary = run_control_room()
print(
f"Control-room run complete (psi_source={summary['psi_source']}, steps={summary['steps']})."
)