Source code for scpn_fusion.control.fusion_sota_mpc

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Fusion Sota MPC
from __future__ import annotations

import logging
import time
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np

logger = logging.getLogger(__name__)

try:
    from scpn_fusion.core._rust_compat import FusionKernel
except ImportError:
    from scpn_fusion.core.fusion_kernel import FusionKernel

PREDICTION_HORIZON = 10
SHOT_LENGTH = 100


def _normalize_bounds(bounds: Tuple[float, float], name: str) -> Tuple[float, float]:
    lo = float(bounds[0])
    hi = float(bounds[1])
    if not np.isfinite(lo) or not np.isfinite(hi) or lo >= hi:
        raise ValueError(f"{name} must be finite with lower < upper.")
    return lo, hi


[docs] class NeuralSurrogate: """ Linearized surrogate model around current operating point. """ def __init__(self, n_coils: int, n_state: int, verbose: bool = True) -> None: self.verbose = bool(verbose) self.A = np.eye(int(n_state), dtype=np.float64) self.B = np.zeros((int(n_state), int(n_coils)), dtype=np.float64) def _log(self, message: str) -> None: if self.verbose: logger.info(message)
[docs] def train_on_kernel(self, kernel: Any, perturbation: float = 1.0) -> None: self._log("[SOTA] Training Neural Surrogate on Physics Kernel...") kernel.solve_equilibrium() base_state = self.get_state(kernel) p = float(perturbation) if not np.isfinite(p) or p <= 0.0: raise ValueError("perturbation must be finite and > 0.") for i in range(len(kernel.cfg["coils"])): old_i = float(kernel.cfg["coils"][i].get("current", 0.0)) kernel.cfg["coils"][i]["current"] = old_i + p kernel.solve_equilibrium() new_state = self.get_state(kernel) self.B[:, i] = (new_state - base_state) / p kernel.cfg["coils"][i]["current"] = old_i kernel.solve_equilibrium() self._log("[SOTA] Surrogate Training Complete.")
[docs] def get_state(self, kernel: Any) -> np.ndarray: idx_max = int(np.argmax(kernel.Psi)) iz, ir = np.unravel_index(idx_max, kernel.Psi.shape) r_ax = float(kernel.R[ir]) z_ax = float(kernel.Z[iz]) xp_pos, _ = kernel.find_x_point(kernel.Psi) return np.array([r_ax, z_ax, float(xp_pos[0]), float(xp_pos[1])], dtype=np.float64)
[docs] def predict(self, current_state: np.ndarray, action_delta: np.ndarray) -> np.ndarray: return np.asarray(current_state, dtype=np.float64) + ( self.B @ np.asarray(action_delta, dtype=np.float64) )
[docs] class ModelPredictiveController: """ Gradient-based MPC planner over surrogate dynamics. """ def __init__( self, surrogate: NeuralSurrogate, target_state: np.ndarray, *, prediction_horizon: int = PREDICTION_HORIZON, learning_rate: float = 0.5, iterations: int = 20, action_limit: float = 2.0, action_regularization: float = 0.1, ) -> None: self.model = surrogate self.target = np.asarray(target_state, dtype=np.float64).reshape(-1) horizon = int(prediction_horizon) if horizon < 1: raise ValueError("prediction_horizon must be >= 1.") learning_rate = float(learning_rate) if not np.isfinite(learning_rate) or learning_rate <= 0.0: raise ValueError("learning_rate must be finite and > 0.") iterations = int(iterations) if iterations < 1: raise ValueError("iterations must be >= 1.") action_limit = float(action_limit) if not np.isfinite(action_limit) or action_limit <= 0.0: raise ValueError("action_limit must be finite and > 0.") action_regularization = float(action_regularization) if not np.isfinite(action_regularization) or action_regularization < 0.0: raise ValueError("action_regularization must be finite and >= 0.") self.horizon = horizon self.learning_rate = learning_rate self.iterations = iterations self.action_limit = action_limit self.action_regularization = action_regularization
[docs] def plan_trajectory(self, current_state: np.ndarray) -> np.ndarray: n_coils = int(self.model.B.shape[1]) planned_actions = np.zeros((self.horizon, n_coils), dtype=np.float64) state0 = np.asarray(current_state, dtype=np.float64).reshape(-1) for _ in range(self.iterations): temp_state = state0.copy() grads = np.zeros_like(planned_actions) for t in range(self.horizon): next_state = self.model.predict(temp_state, planned_actions[t]) error = next_state - self.target grad_step = self.model.B.T @ error grad_step += self.action_regularization * planned_actions[t] grads[t] = grad_step temp_state = next_state planned_actions -= self.learning_rate * grads planned_actions = np.clip(planned_actions, -self.action_limit, self.action_limit) return np.asarray(planned_actions[0], dtype=np.float64)
def _plot_telemetry( h_r: np.ndarray, h_z: np.ndarray, h_xr: np.ndarray, h_xz: np.ndarray, target_vec: np.ndarray, output_path: str, ) -> Tuple[bool, Optional[str]]: try: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) ax1.set_title("MPC Axis Tracking") ax1.plot(h_r, label="R Axis") ax1.plot(h_z, label="Z Axis") ax1.axhline(target_vec[0], color="blue", linestyle="--", alpha=0.5, label="Target R") ax1.axhline(target_vec[1], color="orange", linestyle="--", alpha=0.5, label="Target Z") ax1.legend() ax1.grid(True) ax2.set_title("MPC Divertor (X-Point) Stabilization") ax2.plot(h_xr, label="X-Point R") ax2.plot(h_xz, label="X-Point Z") ax2.axhline(target_vec[2], color="blue", linestyle="--", alpha=0.5, label="Target XR") ax2.axhline(target_vec[3], color="orange", linestyle="--", alpha=0.5, label="Target XZ") ax2.legend() ax2.grid(True) plt.tight_layout() plt.savefig(output_path) plt.close(fig) return True, None except Exception as exc: return False, str(exc)
[docs] def run_sota_simulation( config_file: Optional[str] = None, shot_length: int = SHOT_LENGTH, prediction_horizon: int = PREDICTION_HORIZON, target_vector: Optional[np.ndarray] = None, disturbance_start_step: int = 20, disturbance_per_step_ma: float = 0.1, current_target_bounds: Tuple[float, float] = (5.0, 16.0), action_limit: float = 2.0, coil_current_limits: Tuple[float, float] = (-40.0, 40.0), save_plot: bool = True, output_path: str = "SOTA_MPC_Results.png", verbose: bool = True, kernel_factory: Callable[[str], Any] = FusionKernel, ) -> Dict[str, Any]: if config_file is None: repo_root = Path(__file__).resolve().parents[3] config_file = str(repo_root / "iter_config.json") lo_ip, hi_ip = _normalize_bounds(current_target_bounds, "current_target_bounds") lo_i, hi_i = _normalize_bounds(coil_current_limits, "coil_current_limits") steps = int(shot_length) if steps < 1: raise ValueError("shot_length must be >= 1.") drift_start = int(disturbance_start_step) if drift_start < 0: raise ValueError("disturbance_start_step must be >= 0.") drift_per = float(disturbance_per_step_ma) if target_vector is None: target_vec = np.array([6.0, 0.0, 5.0, -3.5], dtype=np.float64) else: target_vec = np.asarray(target_vector, dtype=np.float64).reshape(4) if verbose: logger.info("--- SCPN FUSION SOTA: Neural-MPC Hybrid Control ---") kernel = kernel_factory(str(config_file)) surrogate = NeuralSurrogate( n_coils=len(kernel.cfg["coils"]), n_state=4, verbose=verbose, ) surrogate.train_on_kernel(kernel) mpc = ModelPredictiveController( surrogate, target_vec, prediction_horizon=prediction_horizon, action_limit=action_limit, ) h_r: list[float] = [] h_z: list[float] = [] h_xr: list[float] = [] h_xz: list[float] = [] h_error: list[float] = [] h_action: list[float] = [] h_coil_abs: list[float] = [] physics_cfg = kernel.cfg.setdefault("physics", {}) target_ip_ma = float(np.clip(physics_cfg.get("plasma_current_target", lo_ip), lo_ip, hi_ip)) physics_cfg["plasma_current_target"] = target_ip_ma if verbose: logger.info("Starting %d step simulation with MPC Horizon=%d...", steps, prediction_horizon) start_time = time.time() for t in range(steps): curr_state = surrogate.get_state(kernel) best_action = mpc.plan_trajectory(curr_state) h_action.append(float(np.max(np.abs(best_action))) if best_action.size else 0.0) for i, delta in enumerate(best_action): old_i = float(kernel.cfg["coils"][i].get("current", 0.0)) kernel.cfg["coils"][i]["current"] = float(np.clip(old_i + float(delta), lo_i, hi_i)) if t >= drift_start: target_ip_ma = float(np.clip(target_ip_ma + drift_per, lo_ip, hi_ip)) physics_cfg["plasma_current_target"] = target_ip_ma kernel.solve_equilibrium() h_coil_abs.append( float( np.max( np.abs( np.asarray( [float(c.get("current", 0.0)) for c in kernel.cfg["coils"]], dtype=np.float64, ) ) ) ) ) h_r.append(float(curr_state[0])) h_z.append(float(curr_state[1])) h_xr.append(float(curr_state[2])) h_xz.append(float(curr_state[3])) err = float(np.linalg.norm(curr_state - target_vec)) h_error.append(err) if verbose and t % 10 == 0: logger.info( "Step %d: R=%.2f, Z=%.2f | X-Point=(%.2f,%.2f) | Err=%.3f", t, curr_state[0], curr_state[1], curr_state[2], curr_state[3], err, ) runtime_s = float(time.time() - start_time) if verbose: logger.info("Simulation finished in %.2fs", runtime_s) plot_saved = False plot_error: Optional[str] = None if save_plot: plot_saved, plot_error = _plot_telemetry( np.asarray(h_r, dtype=np.float64), np.asarray(h_z, dtype=np.float64), np.asarray(h_xr, dtype=np.float64), np.asarray(h_xz, dtype=np.float64), target_vec, output_path, ) if verbose and plot_saved: logger.info("SOTA Analysis saved: %s", output_path) return { "config_path": str(config_file), "steps": int(steps), "prediction_horizon": int(prediction_horizon), "runtime_seconds": runtime_s, "final_target_ip_ma": float(target_ip_ma), "final_r_axis": float(h_r[-1]) if h_r else 0.0, "final_z_axis": float(h_z[-1]) if h_z else 0.0, "final_xpoint_r": float(h_xr[-1]) if h_xr else 0.0, "final_xpoint_z": float(h_xz[-1]) if h_xz else 0.0, "mean_tracking_error": ( float(np.mean(np.asarray(h_error, dtype=np.float64))) if h_error else 0.0 ), "max_abs_action": ( float(np.max(np.asarray(h_action, dtype=np.float64))) if h_action else 0.0 ), "max_abs_coil_current": ( float(np.max(np.asarray(h_coil_abs, dtype=np.float64))) if h_coil_abs else 0.0 ), "plot_saved": bool(plot_saved), "plot_error": plot_error, }
if __name__ == "__main__": run_sota_simulation()