Source code for scpn_fusion.control.fusion_control_room

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Fusion Control Room
from __future__ import annotations

import logging
from pathlib import Path
from typing import Any, Callable, Optional

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation, PillowWriter

logger = logging.getLogger(__name__)

try:
    from scpn_fusion.core._rust_compat import FusionKernel
except ImportError:
    try:
        from scpn_fusion.core.fusion_kernel import FusionKernel
    except (ImportError, OSError):  # pragma: no cover - optional kernel path
        FusionKernel = None

RESOLUTION = 60
SIM_DURATION = 200
FPS = 10
_RENDER_OUTPUT_EXCEPTIONS = (OSError, RuntimeError, ValueError, TypeError)
_KERNEL_INIT_EXCEPTIONS = (OSError, RuntimeError, ValueError, TypeError, AttributeError)
_KERNEL_COIL_UPDATE_EXCEPTIONS = (
    AttributeError,
    KeyError,
    TypeError,
    ValueError,
    IndexError,
)
_KERNEL_SOLVE_EXCEPTIONS = (RuntimeError, ValueError, TypeError, FloatingPointError)


[docs] class TokamakPhysicsEngine: """ Reduced Grad-Shafranov geometry model with optional kernel-Psi ingestion. """ def __init__( self, size: int = RESOLUTION, *, seed: int = 42, kernel: Optional[Any] = None, ) -> None: size = int(size) if size < 16: raise ValueError("size must be >= 16.") self.size = size self.rng = np.random.default_rng(int(seed)) self.kernel = kernel self.R = np.linspace(1.0, 5.0, self.size) self.Z = np.linspace(-3.0, 3.0, self.size) self.RR, self.ZZ = np.meshgrid(self.R, self.Z) self.density = np.zeros((self.size, self.size), dtype=np.float64) # Plasma Parameters (ITER-like) self.R0 = 3.0 self.a = 1.0 self.kappa = 1.7 self.delta = 0.33 # State self.z_pos = 0.0 self.v_drift = 0.0 def _kernel_psi(self) -> Optional[np.ndarray]: if self.kernel is None or not hasattr(self.kernel, "Psi"): return None psi = np.asarray(self.kernel.Psi, dtype=np.float64) if psi.ndim != 2 or psi.shape[0] < 8 or psi.shape[1] < 8: return None return psi
[docs] def solve_flux_surfaces(self) -> tuple[np.ndarray, np.ndarray]: """ Return `(density, psi)` from kernel state when available, otherwise from analytic Miller-parameterized geometry. """ psi_kernel = self._kernel_psi() if psi_kernel is not None: psi_min = float(np.min(psi_kernel)) psi_max = float(np.max(psi_kernel)) psi_span = max(psi_max - psi_min, 1.0e-9) psi = (psi_kernel - psi_min) / psi_span else: rho_sq = ( (self.RR - self.R0) ** 2 + ((self.ZZ - self.z_pos) / self.kappa) ** 2 - 2.0 * self.delta * (self.RR - self.R0) * ((self.RR - self.R0) ** 2) ) psi = rho_sq / (self.a**2) psi = np.clip(psi, 0.0, None) core_term = np.maximum(1.0 - psi, 0.0) self.density = np.where(psi < 1.0, core_term**1.5, 0.0) noise = self.rng.normal(0.0, 0.05, size=self.density.shape) * self.density self.density = np.clip(self.density + noise, 0.0, None) return self.density, psi
[docs] def step_dynamics(self, coil_action_top: float, coil_action_bottom: float) -> float: """ Reduced vertical-displacement dynamics. """ instability_growth = 0.1 control_force = (float(coil_action_bottom) - float(coil_action_top)) * 0.2 disturbance = float(self.rng.normal(0.0, 0.01)) if float(self.rng.random()) < 0.05: disturbance += 0.2 accel = (instability_growth * self.z_pos) + control_force + disturbance self.v_drift += accel self.z_pos += self.v_drift self.v_drift *= 0.9 return float(self.z_pos)
[docs] class DiagnosticSystem: """Noisy vertical-position probe.""" def __init__(self, rng: np.random.Generator) -> None: self._rng = rng
[docs] def measure_position(self, true_z: float) -> float: return float(true_z + self._rng.normal(0.0, 0.05))
[docs] class KalmanObserver: """ Linear Kalman Filter for robust plasma position state estimation. Harden state estimation against sensor noise and dropout. """ def __init__(self, dt: float = 0.1): # State: [z, v_z] self.x = np.array([0.0, 0.0]) self.P = np.eye(2) * 0.1 # Transition Matrix (Linear drift model) self.A = np.array([[1.0, dt], [0.0, 0.9]]) # 0.9 matches drift damping in engine # Measurement Matrix (We only measure position z) self.H = np.array([[1.0, 0.0]]) # Covariance Matrices self.Q = np.eye(2) * 0.01 # Process Noise self.R = np.array([[0.05]]) # Measurement Noise
[docs] def update(self, measured_z: float, dropout: bool = False) -> float: """Predict-correct cycle. Returns filtered Z-position.""" self.x = self.A @ self.x self.P = self.A @ self.P @ self.A.T + self.Q if not dropout and np.isfinite(measured_z): y = measured_z - (self.H @ self.x) # innovation S = self.H @ self.P @ self.H.T + self.R K = self.P @ self.H.T @ np.linalg.inv(S) self.x = self.x + K @ y self.P = (np.eye(2) - K @ self.H) @ self.P else: self.P *= 1.2 # covariance inflation on dropout return float(self.x[0])
[docs] class NeuralController: """Hardened PID control policy for vertical stabilization.""" def __init__(self, dt: float = 0.1) -> None: self.dt = dt self.integral_error = 0.0 self.prev_error = 0.0 self.filtered_derivative = 0.0 # PID Gains (Hardened defaults) self.kp = 5.0 self.ki = 0.5 self.kd = 2.0 # Filter constant for derivative (tau_d) # Prevents noise spikes from being amplified self.tau_d = 0.05 # Anti-windup limit self.integral_limit = 2.0
[docs] def compute_action(self, measured_z: float) -> tuple[float, float]: error = -float(measured_z) p_term = self.kp * error self.integral_error += error * self.dt self.integral_error = np.clip( self.integral_error, -self.integral_limit, self.integral_limit ) i_term = self.ki * self.integral_error # d/dt low-pass: y(t) = α·x(t) + (1-α)·y(t-dt) raw_derivative = (error - self.prev_error) / self.dt alpha = self.dt / (self.tau_d + self.dt) self.filtered_derivative = alpha * raw_derivative + (1.0 - alpha) * self.filtered_derivative d_term = self.kd * self.filtered_derivative self.prev_error = error pid_out = p_term + i_term + d_term # Actuator Saturation (tanh) force = float(np.tanh(pid_out)) if force > 0.0: return 0.0, abs(force) return abs(force), 0.0
def _render_outputs( frames: list[dict[str, Any]], history_z: list[float], history_top: list[float], history_bot: list[float], *, save_animation: bool, save_report: bool, output_gif: str, output_report: str, ) -> tuple[bool, Optional[str], bool, Optional[str]]: fig = plt.figure(figsize=(12, 8), facecolor="#1e1e1e") gs = fig.add_gridspec(2, 2) ax_plasma = fig.add_subplot(gs[:, 0]) ax_plasma.set_facecolor("black") ax_plasma.set_title("Tokamak Cross-Section (Live)", color="white") extent = [ float(frames[0]["r_min"]), float(frames[0]["r_max"]), float(frames[0]["z_min"]), float(frames[0]["z_max"]), ] im = ax_plasma.imshow( np.asarray(frames[0]["density"]), extent=extent, origin="lower", cmap="plasma", vmin=0.0, vmax=1.0, animated=True, ) ax_trace = fig.add_subplot(gs[0, 1]) ax_trace.set_facecolor("#2e2e2e") ax_trace.set_title("Vertical Displacement (Z-Pos)", color="white") ax_trace.set_ylim(-1.5, 1.5) ax_trace.grid(True, color="#444") (line_z,) = ax_trace.plot([], [], "cyan", lw=2, animated=True) (line_setpoint,) = ax_trace.plot([], [], "r--", alpha=0.5, animated=True) ax_trace.set_xlim(0, max(50, len(frames))) ax_coils = fig.add_subplot(gs[1, 1]) ax_coils.set_facecolor("#2e2e2e") ax_coils.set_title("Poloidal Field Coil Currents", color="white") ax_coils.set_ylim(0, 1.1) bar_top = ax_coils.bar([0], [history_top[0]], color="red", label="Top Coil") bar_bot = ax_coils.bar([1], [history_bot[0]], color="blue", label="Bottom Coil") ax_coils.set_xticks([0, 1]) ax_coils.set_xticklabels(["Top", "Bottom"], color="white") ax_coils.legend() (top_marker,) = ax_plasma.plot(3.0, 2.9, "s", color="red", markersize=20, alpha=0.3) (bot_marker,) = ax_plasma.plot(3.0, -2.9, "s", color="blue", markersize=20, alpha=0.3) wall = plt.Rectangle( (extent[0], extent[2] + 0.2), extent[1] - extent[0], (extent[3] - extent[2]) - 0.4, linewidth=2, edgecolor="gray", facecolor="none", ) ax_plasma.add_patch(wall) def update(frame_idx: int): rec = frames[frame_idx] im.set_data(np.asarray(rec["density"])) line_z.set_data(range(frame_idx + 1), history_z[: frame_idx + 1]) line_setpoint.set_data(range(frame_idx + 1), [0.0] * (frame_idx + 1)) top_val = float(history_top[frame_idx]) bot_val = float(history_bot[frame_idx]) bar_top[0].set_height(top_val) bar_bot[0].set_height(bot_val) top_marker.set_alpha(0.3 + (0.7 * top_val)) bot_marker.set_alpha(0.3 + (0.7 * bot_val)) ax_plasma.set_title(f"Plasma Shape (t={frame_idx})", color="white") return ( im, line_z, line_setpoint, bar_top[0], bar_bot[0], top_marker, bot_marker, ) animation_saved = False animation_error: Optional[str] = None if save_animation: try: ani = FuncAnimation( fig, update, frames=len(frames), interval=100, blit=True, ) ani.save(output_gif, writer=PillowWriter(fps=FPS)) animation_saved = True except _RENDER_OUTPUT_EXCEPTIONS as exc: animation_error = str(exc) report_saved = False report_error: Optional[str] = None if save_report: try: update(len(frames) - 1) plt.tight_layout() fig.savefig(output_report) report_saved = True except _RENDER_OUTPUT_EXCEPTIONS as exc: report_error = str(exc) plt.close(fig) return animation_saved, animation_error, report_saved, report_error
[docs] def run_control_room( sim_duration: int = SIM_DURATION, *, seed: int = 42, save_animation: bool = True, save_report: bool = True, output_gif: str = "SCPN_Fusion_Control_Room.gif", output_report: str = "SCPN_Fusion_Status_Report.png", verbose: bool = True, kernel_factory: Optional[Callable[[str], Any]] = None, config_file: Optional[str] = None, allow_kernel_fallback: bool = True, ) -> dict[str, Any]: """ Run the control-room loop and return deterministic summary metrics. """ steps = int(sim_duration) if steps < 1: raise ValueError("sim_duration must be >= 1.") allow_kernel_fallback = bool(allow_kernel_fallback) rng = np.random.default_rng(int(seed)) kernel = None kernel_error = None psi_source = "analytic" if kernel_factory is not None or (config_file is not None and FusionKernel is not None): if kernel_factory is None and FusionKernel is not None: kernel_factory = FusionKernel cfg = ( str(config_file) if config_file is not None else str(Path(__file__).resolve().parents[3] / "iter_config.json") ) try: kernel = kernel_factory(cfg) if kernel_factory is not None else None if kernel is not None: psi_source = "kernel" except _KERNEL_INIT_EXCEPTIONS as exc: if not allow_kernel_fallback: raise RuntimeError( "Kernel initialization failed and allow_kernel_fallback=False." ) from exc kernel = None kernel_error = str(exc) psi_source = "analytic" if verbose: logger.info("--- SCPN FUSION CONTROL ROOM: Grad-Shafranov VDE Simulation ---") if psi_source == "kernel": logger.info("Using kernel-backed Psi source.") elif kernel_error: logger.warning( "Kernel init failed; continuing with analytic Psi source: %s", kernel_error, ) reactor = TokamakPhysicsEngine(seed=seed, kernel=kernel) sensors = DiagnosticSystem(rng=rng) observer = KalmanObserver() ai = NeuralController() history_z: list[float] = [] history_z_measured: list[float] = [] history_top: list[float] = [] history_bot: list[float] = [] frames: list[dict[str, Any]] = [] kernel_coil_update_failures = 0 kernel_coil_update_error: Optional[str] = None kernel_solve_failures = 0 kernel_solve_error: Optional[str] = None top_action = 0.0 bot_action = 0.0 for frame in range(steps): if kernel is not None and hasattr(kernel, "cfg"): try: coils = kernel.cfg.get("coils", []) if len(coils) >= 5: coils[0]["current"] = float(coils[0].get("current", 0.0)) + top_action coils[4]["current"] = float(coils[4].get("current", 0.0)) + bot_action except _KERNEL_COIL_UPDATE_EXCEPTIONS as exc: kernel_coil_update_failures += 1 if kernel_coil_update_error is None: kernel_coil_update_error = str(exc) if not allow_kernel_fallback: raise RuntimeError( "Kernel coil update failed and allow_kernel_fallback=False." ) from exc logger.warning( "Kernel coil update failed; continuing with analytic control actions: %s", kernel_coil_update_error, ) if kernel is not None and hasattr(kernel, "solve_equilibrium"): try: kernel.solve_equilibrium() except _KERNEL_SOLVE_EXCEPTIONS as exc: kernel_solve_failures += 1 if kernel_solve_error is None: kernel_solve_error = str(exc) if not allow_kernel_fallback: raise RuntimeError( "Kernel solve failed and allow_kernel_fallback=False." ) from exc logger.warning( "Kernel equilibrium solve failed; continuing with last known state: %s", kernel_solve_error, ) true_z = reactor.step_dynamics(top_action, bot_action) density, psi = reactor.solve_flux_surfaces() # Sensor measurement with potential dropout raw_measured_z = sensors.measure_position(true_z) dropout = frame % 20 == 0 # Simulate periodic glitch # State estimation filtered_z = observer.update(raw_measured_z, dropout=dropout) # AI uses filtered state top_action, bot_action = ai.compute_action(filtered_z) history_z.append(float(true_z)) history_z_measured.append(float(raw_measured_z)) history_top.append(float(top_action)) history_bot.append(float(bot_action)) frames.append( { "density": np.asarray(density, dtype=np.float64).copy(), "psi": np.asarray(psi, dtype=np.float64).copy(), "r_min": float(np.min(reactor.R)), "r_max": float(np.max(reactor.R)), "z_min": float(np.min(reactor.Z)), "z_max": float(np.max(reactor.Z)), } ) animation_saved = False animation_error: Optional[str] = None report_saved = False report_error: Optional[str] = None if save_animation or save_report: ( animation_saved, animation_error, report_saved, report_error, ) = _render_outputs( frames, history_z, history_top, history_bot, save_animation=save_animation, save_report=save_report, output_gif=output_gif, output_report=output_report, ) z_arr = np.asarray(history_z, dtype=np.float64) top_arr = np.asarray(history_top, dtype=np.float64) bot_arr = np.asarray(history_bot, dtype=np.float64) return { "seed": int(seed), "steps": int(steps), "psi_source": psi_source, "kernel_fallback_allowed": bool(allow_kernel_fallback), "kernel_fallback_used": bool( kernel_error is not None or kernel_coil_update_failures > 0 or kernel_solve_failures > 0 ), "kernel_error": kernel_error, "final_z": float(z_arr[-1]), "mean_abs_z": float(np.mean(np.abs(z_arr))), "max_abs_z": float(np.max(np.abs(z_arr))), "mean_top_action": float(np.mean(top_arr)), "mean_bottom_action": float(np.mean(bot_arr)), "kernel_coil_update_failures": int(kernel_coil_update_failures), "kernel_coil_update_error": kernel_coil_update_error, "kernel_solve_failures": int(kernel_solve_failures), "kernel_solve_error": kernel_solve_error, "animation_saved": bool(animation_saved), "animation_error": animation_error, "report_saved": bool(report_saved), "report_error": report_error, }
if __name__ == "__main__": summary = run_control_room() print( f"Control-room run complete (psi_source={summary['psi_source']}, steps={summary['steps']})." )