Source code for scpn_fusion.control.fusion_control_room

# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Fusion Control Room
"""Deterministic control-room simulation with diagnostics, estimation, and rendering."""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Any, Callable, Optional

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation, PillowWriter
from matplotlib.patches import Rectangle
from numpy.typing import NDArray

logger = logging.getLogger(__name__)

FloatArray = NDArray[np.float64]

FusionKernel: type[Any] | None
try:
    from scpn_fusion.core._rust_compat import FusionKernel
except ImportError:
    try:
        from scpn_fusion.core.fusion_kernel import FusionKernel
    except (ImportError, OSError):  # pragma: no cover - optional kernel path
        FusionKernel = None

RESOLUTION = 60
SIM_DURATION = 200
FPS = 10
_RENDER_OUTPUT_EXCEPTIONS = (OSError, RuntimeError, ValueError, TypeError)
_KERNEL_INIT_EXCEPTIONS = (OSError, RuntimeError, ValueError, TypeError, AttributeError)
_KERNEL_COIL_UPDATE_EXCEPTIONS = (
    AttributeError,
    KeyError,
    TypeError,
    ValueError,
    IndexError,
)
_KERNEL_SOLVE_EXCEPTIONS = (RuntimeError, ValueError, TypeError, FloatingPointError)


[docs] class TokamakPhysicsEngine: """Reduced Grad-Shafranov geometry model with optional kernel-Psi ingestion."""
[docs] def __init__( self, size: int = RESOLUTION, *, seed: int = 42, kernel: Optional[Any] = None, ) -> None: """Initialise the geometry grid, RNG, and optional kernel handle.""" size = int(size) if size < 16: raise ValueError("size must be >= 16.") self.size = size self.rng = np.random.default_rng(int(seed)) self.kernel = kernel self.R = np.linspace(1.0, 5.0, self.size) self.Z = np.linspace(-3.0, 3.0, self.size) self.RR, self.ZZ = np.meshgrid(self.R, self.Z) self.density: FloatArray = np.zeros((self.size, self.size), dtype=np.float64) # Plasma Parameters (ITER-like) self.R0 = 3.0 self.a = 1.0 self.kappa = 1.7 self.delta = 0.33 # State self.z_pos = 0.0 self.v_drift = 0.0
def _kernel_psi(self) -> Optional[FloatArray]: if self.kernel is None or not hasattr(self.kernel, "Psi"): return None psi = np.asarray(self.kernel.Psi, dtype=np.float64) if psi.ndim != 2 or psi.shape[0] < 8 or psi.shape[1] < 8: return None return psi
[docs] def solve_flux_surfaces(self) -> tuple[FloatArray, FloatArray]: """Return ``(density, psi)`` from kernel state or analytic geometry. Uses the kernel ``Psi`` field when available, otherwise an analytic Miller-parameterized geometry. """ psi_kernel = self._kernel_psi() if psi_kernel is not None: psi_min = float(np.min(psi_kernel)) psi_max = float(np.max(psi_kernel)) psi_span = max(psi_max - psi_min, 1.0e-9) psi = (psi_kernel - psi_min) / psi_span else: rho_sq = ( (self.RR - self.R0) ** 2 + ((self.ZZ - self.z_pos) / self.kappa) ** 2 - 2.0 * self.delta * (self.RR - self.R0) * ((self.RR - self.R0) ** 2) ) psi = rho_sq / (self.a**2) psi = np.clip(psi, 0.0, None) core_term = np.maximum(1.0 - psi, 0.0) self.density = np.asarray(np.where(psi < 1.0, core_term**1.5, 0.0), dtype=np.float64) noise = self.rng.normal(0.0, 0.05, size=self.density.shape) * self.density self.density = np.asarray(np.clip(self.density + noise, 0.0, None), dtype=np.float64) return self.density, np.asarray(psi, dtype=np.float64)
[docs] def step_dynamics(self, coil_action_top: float, coil_action_bottom: float) -> float: """Advance the reduced vertical-displacement dynamics by one step.""" instability_growth = 0.1 control_force = (float(coil_action_bottom) - float(coil_action_top)) * 0.2 disturbance = float(self.rng.normal(0.0, 0.01)) if float(self.rng.random()) < 0.05: disturbance += 0.2 accel = (instability_growth * self.z_pos) + control_force + disturbance self.v_drift += accel self.z_pos += self.v_drift self.v_drift *= 0.9 return float(self.z_pos)
[docs] class DiagnosticSystem: """Noisy vertical-position probe."""
[docs] def __init__(self, rng: np.random.Generator) -> None: """Store the random generator used to inject measurement noise.""" self._rng = rng
[docs] def measure_position(self, true_z: float) -> float: """Return a noisy vertical-position measurement from the supplied true state.""" return float(true_z + self._rng.normal(0.0, 0.05))
[docs] class KalmanObserver: """Linear Kalman filter for plasma vertical-position estimation. Hardens the state estimate against sensor noise and measurement dropout. """
[docs] def __init__(self, dt: float = 0.1): """Initialise the state vector, covariances, and linear drift model.""" # State: [z, v_z] self.x = np.array([0.0, 0.0]) self.P = np.eye(2) * 0.1 # Transition Matrix (Linear drift model) self.A = np.array([[1.0, dt], [0.0, 0.9]]) # 0.9 matches drift damping in engine # Measurement Matrix (We only measure position z) self.H = np.array([[1.0, 0.0]]) # Covariance Matrices self.Q = np.eye(2) * 0.01 # Process Noise self.R = np.array([[0.05]]) # Measurement Noise
[docs] def update(self, measured_z: float, dropout: bool = False) -> float: """Predict-correct cycle. Returns filtered Z-position.""" self.x = self.A @ self.x self.P = self.A @ self.P @ self.A.T + self.Q if not dropout and np.isfinite(measured_z): y = measured_z - (self.H @ self.x) # innovation S = self.H @ self.P @ self.H.T + self.R K = self.P @ self.H.T @ np.linalg.inv(S) self.x = self.x + K @ y self.P = (np.eye(2) - K @ self.H) @ self.P else: self.P *= 1.2 # covariance inflation on dropout return float(self.x[0])
[docs] class NeuralController: """Hardened PID control policy for vertical stabilization."""
[docs] def __init__(self, dt: float = 0.1) -> None: """Initialise PID gains, derivative filter, and anti-windup limit.""" self.dt = dt self.integral_error = 0.0 self.prev_error = 0.0 self.filtered_derivative = 0.0 # PID Gains (Hardened defaults) self.kp = 5.0 self.ki = 0.5 self.kd = 2.0 # Filter constant for derivative (tau_d) # Prevents noise spikes from being amplified self.tau_d = 0.05 # Anti-windup limit self.integral_limit = 2.0
[docs] def compute_action(self, measured_z: float) -> tuple[float, float]: """Map filtered vertical displacement to bounded top and bottom coil actions.""" error = -float(measured_z) p_term = self.kp * error self.integral_error += error * self.dt self.integral_error = np.clip( self.integral_error, -self.integral_limit, self.integral_limit ) i_term = self.ki * self.integral_error # d/dt low-pass: y(t) = α·x(t) + (1-α)·y(t-dt) raw_derivative = (error - self.prev_error) / self.dt alpha = self.dt / (self.tau_d + self.dt) self.filtered_derivative = alpha * raw_derivative + (1.0 - alpha) * self.filtered_derivative d_term = self.kd * self.filtered_derivative self.prev_error = error pid_out = p_term + i_term + d_term # Actuator Saturation (tanh) force = float(np.tanh(pid_out)) if force > 0.0: return 0.0, abs(force) return abs(force), 0.0
def _render_outputs( frames: list[dict[str, Any]], history_z: list[float], history_top: list[float], history_bot: list[float], *, save_animation: bool, save_report: bool, output_gif: str, output_report: str, ) -> tuple[bool, Optional[str], bool, Optional[str]]: fig = plt.figure(figsize=(12, 8), facecolor="#1e1e1e") gs = fig.add_gridspec(2, 2) ax_plasma = fig.add_subplot(gs[:, 0]) ax_plasma.set_facecolor("black") ax_plasma.set_title("Tokamak Cross-Section (Live)", color="white") extent: tuple[float, float, float, float] = ( float(frames[0]["r_min"]), float(frames[0]["r_max"]), float(frames[0]["z_min"]), float(frames[0]["z_max"]), ) im = ax_plasma.imshow( np.asarray(frames[0]["density"]), extent=extent, origin="lower", cmap="plasma", vmin=0.0, vmax=1.0, animated=True, ) ax_trace = fig.add_subplot(gs[0, 1]) ax_trace.set_facecolor("#2e2e2e") ax_trace.set_title("Vertical Displacement (Z-Pos)", color="white") ax_trace.set_ylim(-1.5, 1.5) ax_trace.grid(True, color="#444") (line_z,) = ax_trace.plot([], [], "cyan", lw=2, animated=True) (line_setpoint,) = ax_trace.plot([], [], "r--", alpha=0.5, animated=True) ax_trace.set_xlim(0, max(50, len(frames))) ax_coils = fig.add_subplot(gs[1, 1]) ax_coils.set_facecolor("#2e2e2e") ax_coils.set_title("Poloidal Field Coil Currents", color="white") ax_coils.set_ylim(0, 1.1) bar_top = ax_coils.bar([0], [history_top[0]], color="red", label="Top Coil") bar_bot = ax_coils.bar([1], [history_bot[0]], color="blue", label="Bottom Coil") ax_coils.set_xticks([0, 1]) ax_coils.set_xticklabels(["Top", "Bottom"], color="white") ax_coils.legend() (top_marker,) = ax_plasma.plot(3.0, 2.9, "s", color="red", markersize=20, alpha=0.3) (bot_marker,) = ax_plasma.plot(3.0, -2.9, "s", color="blue", markersize=20, alpha=0.3) wall = Rectangle( (extent[0], extent[2] + 0.2), extent[1] - extent[0], (extent[3] - extent[2]) - 0.4, linewidth=2, edgecolor="gray", facecolor="none", ) ax_plasma.add_patch(wall) def update(frame_idx: int) -> tuple[Any, ...]: rec = frames[frame_idx] im.set_data(np.asarray(rec["density"])) line_z.set_data(range(frame_idx + 1), history_z[: frame_idx + 1]) line_setpoint.set_data(range(frame_idx + 1), [0.0] * (frame_idx + 1)) top_val = float(history_top[frame_idx]) bot_val = float(history_bot[frame_idx]) bar_top[0].set_height(top_val) bar_bot[0].set_height(bot_val) top_marker.set_alpha(0.3 + (0.7 * top_val)) bot_marker.set_alpha(0.3 + (0.7 * bot_val)) ax_plasma.set_title(f"Plasma Shape (t={frame_idx})", color="white") return ( im, line_z, line_setpoint, bar_top[0], bar_bot[0], top_marker, bot_marker, ) animation_saved = False animation_error: Optional[str] = None if save_animation: try: ani = FuncAnimation( fig, update, frames=len(frames), interval=100, blit=True, ) ani.save(output_gif, writer=PillowWriter(fps=FPS)) animation_saved = True except _RENDER_OUTPUT_EXCEPTIONS as exc: animation_error = str(exc) report_saved = False report_error: Optional[str] = None if save_report: try: update(len(frames) - 1) plt.tight_layout() fig.savefig(output_report) report_saved = True except _RENDER_OUTPUT_EXCEPTIONS as exc: report_error = str(exc) plt.close(fig) return animation_saved, animation_error, report_saved, report_error
[docs] def run_control_room( sim_duration: int = SIM_DURATION, *, seed: int = 42, save_animation: bool = True, save_report: bool = True, output_gif: str = "SCPN_Fusion_Control_Room.gif", output_report: str = "SCPN_Fusion_Status_Report.png", verbose: bool = True, kernel_factory: Optional[Callable[[str], Any]] = None, config_file: Optional[str] = None, allow_kernel_fallback: bool = True, ) -> dict[str, Any]: """Run the control-room loop and return deterministic summary metrics.""" steps = int(sim_duration) if steps < 1: raise ValueError("sim_duration must be >= 1.") allow_kernel_fallback = bool(allow_kernel_fallback) rng = np.random.default_rng(int(seed)) kernel = None kernel_error = None psi_source = "analytic" if kernel_factory is not None or (config_file is not None and FusionKernel is not None): if kernel_factory is None and FusionKernel is not None: kernel_factory = FusionKernel cfg = ( str(config_file) if config_file is not None else str(Path(__file__).resolve().parents[3] / "iter_config.json") ) try: kernel = kernel_factory(cfg) if kernel_factory is not None else None if kernel is not None: psi_source = "kernel" except _KERNEL_INIT_EXCEPTIONS as exc: if not allow_kernel_fallback: raise RuntimeError( "Kernel initialization failed and allow_kernel_fallback=False." ) from exc kernel = None kernel_error = str(exc) psi_source = "analytic" if verbose: logger.info("--- SCPN FUSION CONTROL ROOM: Grad-Shafranov VDE Simulation ---") if psi_source == "kernel": logger.info("Using kernel-backed Psi source.") elif kernel_error: logger.warning( "Kernel init failed; continuing with analytic Psi source: %s", kernel_error, ) reactor = TokamakPhysicsEngine(seed=seed, kernel=kernel) sensors = DiagnosticSystem(rng=rng) observer = KalmanObserver() ai = NeuralController() history_z: list[float] = [] history_z_measured: list[float] = [] history_top: list[float] = [] history_bot: list[float] = [] frames: list[dict[str, Any]] = [] kernel_coil_update_failures = 0 kernel_coil_update_error: Optional[str] = None kernel_solve_failures = 0 kernel_solve_error: Optional[str] = None top_action = 0.0 bot_action = 0.0 for frame in range(steps): if kernel is not None and hasattr(kernel, "cfg"): try: coils = kernel.cfg.get("coils", []) if len(coils) >= 5: coils[0]["current"] = float(coils[0].get("current", 0.0)) + top_action coils[4]["current"] = float(coils[4].get("current", 0.0)) + bot_action except _KERNEL_COIL_UPDATE_EXCEPTIONS as exc: kernel_coil_update_failures += 1 if kernel_coil_update_error is None: kernel_coil_update_error = str(exc) if not allow_kernel_fallback: raise RuntimeError( "Kernel coil update failed and allow_kernel_fallback=False." ) from exc logger.warning( "Kernel coil update failed; continuing with analytic control actions: %s", kernel_coil_update_error, ) if kernel is not None and hasattr(kernel, "solve_equilibrium"): try: kernel.solve_equilibrium() except _KERNEL_SOLVE_EXCEPTIONS as exc: kernel_solve_failures += 1 if kernel_solve_error is None: kernel_solve_error = str(exc) if not allow_kernel_fallback: raise RuntimeError( "Kernel solve failed and allow_kernel_fallback=False." ) from exc logger.warning( "Kernel equilibrium solve failed; continuing with last known state: %s", kernel_solve_error, ) true_z = reactor.step_dynamics(top_action, bot_action) density, psi = reactor.solve_flux_surfaces() # Sensor measurement with potential dropout raw_measured_z = sensors.measure_position(true_z) dropout = frame % 20 == 0 # Simulate periodic glitch # State estimation filtered_z = observer.update(raw_measured_z, dropout=dropout) # AI uses filtered state top_action, bot_action = ai.compute_action(filtered_z) history_z.append(float(true_z)) history_z_measured.append(float(raw_measured_z)) history_top.append(float(top_action)) history_bot.append(float(bot_action)) frames.append( { "density": np.asarray(density, dtype=np.float64).copy(), "psi": np.asarray(psi, dtype=np.float64).copy(), "r_min": float(np.min(reactor.R)), "r_max": float(np.max(reactor.R)), "z_min": float(np.min(reactor.Z)), "z_max": float(np.max(reactor.Z)), } ) animation_saved = False animation_error: Optional[str] = None report_saved = False report_error: Optional[str] = None if save_animation or save_report: ( animation_saved, animation_error, report_saved, report_error, ) = _render_outputs( frames, history_z, history_top, history_bot, save_animation=save_animation, save_report=save_report, output_gif=output_gif, output_report=output_report, ) z_arr = np.asarray(history_z, dtype=np.float64) top_arr = np.asarray(history_top, dtype=np.float64) bot_arr = np.asarray(history_bot, dtype=np.float64) return { "seed": int(seed), "steps": int(steps), "psi_source": psi_source, "kernel_fallback_allowed": bool(allow_kernel_fallback), "kernel_fallback_used": bool( kernel_error is not None or kernel_coil_update_failures > 0 or kernel_solve_failures > 0 ), "kernel_error": kernel_error, "final_z": float(z_arr[-1]), "mean_abs_z": float(np.mean(np.abs(z_arr))), "max_abs_z": float(np.max(np.abs(z_arr))), "mean_top_action": float(np.mean(top_arr)), "mean_bottom_action": float(np.mean(bot_arr)), "kernel_coil_update_failures": int(kernel_coil_update_failures), "kernel_coil_update_error": kernel_coil_update_error, "kernel_solve_failures": int(kernel_solve_failures), "kernel_solve_error": kernel_solve_error, "animation_saved": bool(animation_saved), "animation_error": animation_error, "report_saved": bool(report_saved), "report_error": report_error, }
if __name__ == "__main__": summary = run_control_room() print( f"Control-room run complete (psi_source={summary['psi_source']}, steps={summary['steps']})." )