Source code for scpn_fusion.control.fusion_optimal_control

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Fusion Optimal Control
from __future__ import annotations

import logging
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np

logger = logging.getLogger(__name__)

try:
    from scpn_fusion.core._rust_compat import FusionKernel
except ImportError:
    from scpn_fusion.core.fusion_kernel import FusionKernel

TARGET_R = 6.0
TARGET_Z = 0.0
SHOT_STEPS = 50


def _normalize_bounds(bounds: Tuple[float, float], name: str) -> Tuple[float, float]:
    lo = float(bounds[0])
    hi = float(bounds[1])
    if not np.isfinite(lo) or not np.isfinite(hi) or lo >= hi:
        raise ValueError(f"{name} must be finite with lower < upper.")
    return lo, hi


[docs] class OptimalController: """ MIMO controller using response-matrix inversion with bounded actuation. """ def __init__( self, config_file: str, *, kernel_factory: Callable[[str], Any] = FusionKernel, verbose: bool = True, correction_limit: float = 5.0, coil_current_limits: Tuple[float, float] = (-40.0, 40.0), current_target_limits: Tuple[float, float] = (5.0, 16.0), ) -> None: self.kernel = kernel_factory(config_file) self.verbose = bool(verbose) self.n_coils = len(self.kernel.cfg["coils"]) self.coil_names = [str(c["name"]) for c in self.kernel.cfg["coils"]] self.response_matrix = np.zeros((2, self.n_coils), dtype=np.float64) correction_limit = float(correction_limit) if not np.isfinite(correction_limit) or correction_limit <= 0.0: raise ValueError("correction_limit must be finite and > 0.") self.correction_limit = correction_limit self.coil_current_limits = _normalize_bounds(coil_current_limits, "coil_current_limits") self.current_target_limits = _normalize_bounds( current_target_limits, "current_target_limits" ) self.history: Dict[str, list[float]] = { "t": [], "R_axis": [], "Z_axis": [], "Ip": [], "error_norm": [], "max_abs_delta_i": [], "max_abs_coil_current": [], } def _log(self, message: str) -> None: if self.verbose: logger.info(message)
[docs] def identify_system(self, perturbation: float = 0.5) -> None: """ Perturb each coil and measure plasma-axis response to build Jacobian. """ self._log("[OptControl] Identifying System Response Matrix...") self.kernel.solve_equilibrium() base_r, base_z = self.get_plasma_pos() self._log(f" Base Position: R={base_r:.3f}, Z={base_z:.3f}") p = float(perturbation) if not np.isfinite(p) or p <= 0.0: raise ValueError("perturbation must be finite and > 0.") for i in range(self.n_coils): orig_i = float(self.kernel.cfg["coils"][i].get("current", 0.0)) self.kernel.cfg["coils"][i]["current"] = orig_i + p self.kernel.solve_equilibrium() pos_plus = self.get_plasma_pos() self.kernel.cfg["coils"][i]["current"] = orig_i - p self.kernel.solve_equilibrium() pos_minus = self.get_plasma_pos() self.kernel.cfg["coils"][i]["current"] = orig_i self.kernel.solve_equilibrium() d_r = float((pos_plus[0] - pos_minus[0]) / (2.0 * p)) d_z = float((pos_plus[1] - pos_minus[1]) / (2.0 * p)) self.response_matrix[0, i] = d_r self.response_matrix[1, i] = d_z self._log(f" Coil {self.coil_names[i]}: dR/dI={d_r:.4f}, dZ/dI={d_z:.4f}") self._log("[OptControl] System Identification Complete.")
[docs] def get_shafranov_shift(self) -> float: """ Calculates the Shafranov Shift (Delta R) heuristic. Delta R ~ (a^2 / 2R) * (beta_p + li/2) """ dims = self.kernel.cfg.get("dimensions") if dims is None: # Fallback: infer from grid arrays when dimensions not in config r_min = float(self.kernel.R[0]) r_max = float(self.kernel.R[-1]) else: r_min = dims["R_min"] r_max = dims["R_max"] a = (r_max - r_min) / 2.0 R0 = (r_max + r_min) / 2.0 if R0 <= 0.0: return 0.0 beta_p = self.kernel.cfg["physics"].get("beta_p", 0.5) li = 0.8 # Internal inductance proxy shift = (a**2 / (2.0 * R0)) * (beta_p + li / 2.0) return float(shift)
[docs] def get_plasma_pos(self) -> np.ndarray: """ Return current magnetic-axis position [R, Z]. Harden with Shafranov Shift correction for high-beta states. """ idx_max = int(np.argmax(self.kernel.Psi)) iz, ir = np.unravel_index(idx_max, self.kernel.Psi.shape) r_geo = self.kernel.R[ir] z_geo = self.kernel.Z[iz] # Apply shift if beta_p is significant delta_r = self.get_shafranov_shift() return np.array([r_geo + delta_r, z_geo], dtype=np.float64)
[docs] def compute_optimal_correction( self, current_pos: np.ndarray, target_pos: np.ndarray, regularization_lambda: float = 0.05, *, regularization_limit: float | None = None, ) -> np.ndarray: """ Solve Error = J * Delta_I using Tikhonov-regularized (damped) SVD. Provides smoother control than hard-cutoff SVD near singularities. """ # Accept regularization_limit as alias for regularization_lambda if regularization_limit is not None: regularization_lambda = regularization_limit cur = np.asarray(current_pos, dtype=np.float64).reshape(2) tgt = np.asarray(target_pos, dtype=np.float64).reshape(2) error = tgt - cur u, s, vt = np.linalg.svd(self.response_matrix, full_matrices=False) lam = float(regularization_lambda) if not np.isfinite(lam) or lam < 0.0: raise ValueError("regularization_limit must be finite and >= 0.") # Tikhonov Damping: s_inv = s / (s^2 + lambda^2) s_inv = s / (s**2 + lam**2) j_inv = vt.T @ np.diag(s_inv) @ u.T delta_currents = np.asarray(j_inv @ error, dtype=np.float64) return np.clip(delta_currents, -self.correction_limit, self.correction_limit)
def _apply_corrections(self, delta_currents: np.ndarray, gain: float) -> None: lo, hi = self.coil_current_limits g = float(gain) for i in range(self.n_coils): old = float(self.kernel.cfg["coils"][i].get("current", 0.0)) upd = old + g * float(delta_currents[i]) self.kernel.cfg["coils"][i]["current"] = float(np.clip(upd, lo, hi))
[docs] def run_optimal_shot( self, shot_steps: int = SHOT_STEPS, target_r: float = TARGET_R, target_z: float = TARGET_Z, gain: float = 0.8, ip_start_ma: float = 10.0, ip_span_ma: float = 5.0, identify_first: bool = False, save_plot: bool = True, output_path: str = "Optimal_Control_Result.png", ) -> Dict[str, Any]: self._log("\n--- INITIATING OPTIMAL CONTROL SHOT ---") if identify_first: self.identify_system() steps = int(shot_steps) if steps < 1: raise ValueError("shot_steps must be >= 1.") target_vec = np.array([float(target_r), float(target_z)], dtype=np.float64) self.history = {k: [] for k in self.history} self.kernel.solve_equilibrium() lo_ip, hi_ip = self.current_target_limits for t in range(steps): frac = float(t) / float(max(steps, 1)) target_ip = float(np.clip(ip_start_ma + ip_span_ma * frac, lo_ip, hi_ip)) self.kernel.cfg.setdefault("physics", {})["plasma_current_target"] = target_ip curr_pos = self.get_plasma_pos() d_i = self.compute_optimal_correction(curr_pos, target_vec) self._apply_corrections(d_i, gain=float(gain)) self.kernel.solve_equilibrium() err = float(np.linalg.norm(target_vec - curr_pos)) max_abs_delta_i = float(np.max(np.abs(d_i))) if d_i.size > 0 else 0.0 max_abs_coil_current = float( np.max( np.abs( np.asarray( [ float(c.get("current", 0.0)) for c in self.kernel.cfg.get("coils", []) ], dtype=np.float64, ) ) ) ) self.history["t"].append(float(t)) self.history["R_axis"].append(float(curr_pos[0])) self.history["Z_axis"].append(float(curr_pos[1])) self.history["Ip"].append(target_ip) self.history["error_norm"].append(err) self.history["max_abs_delta_i"].append(max_abs_delta_i) self.history["max_abs_coil_current"].append(max_abs_coil_current) self._log( f"Step {t}: R={curr_pos[0]:.3f} (Tgt {target_r}), " f"Z={curr_pos[1]:.3f} (Tgt {target_z}) | Err={err:.4f} | " f"Max dI={max_abs_delta_i:.2f}" ) plot_saved = False plot_error: Optional[str] = None if save_plot: plot_saved, plot_error = self.plot_telemetry(output_path=output_path) r_arr = np.asarray(self.history["R_axis"], dtype=np.float64) z_arr = np.asarray(self.history["Z_axis"], dtype=np.float64) e_arr = np.asarray(self.history["error_norm"], dtype=np.float64) di_arr = np.asarray(self.history["max_abs_delta_i"], dtype=np.float64) coil_arr = np.asarray(self.history["max_abs_coil_current"], dtype=np.float64) return { "steps": int(steps), "final_target_ip_ma": float(self.history["Ip"][-1]) if self.history["Ip"] else 0.0, "final_axis_r": float(r_arr[-1]) if r_arr.size else 0.0, "final_axis_z": float(z_arr[-1]) if z_arr.size else 0.0, "mean_abs_r_error": ( float(np.mean(np.abs(r_arr - float(target_r)))) if r_arr.size else 0.0 ), "mean_abs_z_error": ( float(np.mean(np.abs(z_arr - float(target_z)))) if z_arr.size else 0.0 ), "mean_error_norm": float(np.mean(e_arr)) if e_arr.size else 0.0, "max_abs_delta_i": float(np.max(di_arr)) if di_arr.size else 0.0, "max_abs_coil_current": float(np.max(coil_arr)) if coil_arr.size else 0.0, "plot_saved": bool(plot_saved), "plot_error": plot_error, }
[docs] def plot_telemetry( self, output_path: str = "Optimal_Control_Result.png", ) -> Tuple[bool, Optional[str]]: try: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) ax1.set_title("Optimal Position Control (SVD-MIMO)") ax1.plot(self.history["t"], self.history["R_axis"], "b-o", label="R Axis") ax1.plot(self.history["t"], self.history["Z_axis"], "r-s", label="Z Axis") ax1.axhline(TARGET_R, color="b", linestyle="--", alpha=0.5) ax1.axhline(TARGET_Z, color="r", linestyle="--", alpha=0.5) ax1.legend() ax1.grid(True) ax2.set_title("Final Plasma State") if hasattr(self.kernel, "RR") and hasattr(self.kernel, "ZZ"): ax2.contour(self.kernel.RR, self.kernel.ZZ, self.kernel.Psi, levels=20, colors="k") if hasattr(self.kernel, "J_phi"): ax2.imshow( self.kernel.J_phi, extent=[1, 9, -5, 5], origin="lower", cmap="hot", alpha=0.5, ) for c in self.kernel.cfg.get("coils", []): r = float(c.get("r", 0.0)) z = float(c.get("z", 0.0)) cur = float(c.get("current", 0.0)) ax2.plot(r, z, "rx" if cur > 0 else "bx", markersize=8) plt.tight_layout() plt.savefig(output_path) plt.close(fig) self._log(f"Analysis saved: {output_path}") return True, None except Exception as exc: return False, str(exc)
[docs] def run_optimal_control( config_file: Optional[str] = None, shot_steps: int = SHOT_STEPS, target_r: float = TARGET_R, target_z: float = TARGET_Z, seed: int = 42, save_plot: bool = True, output_path: str = "Optimal_Control_Result.png", verbose: bool = True, kernel_factory: Callable[[str], Any] = FusionKernel, coil_current_limits: Tuple[float, float] = (-40.0, 40.0), current_target_limits: Tuple[float, float] = (5.0, 16.0), ) -> Dict[str, Any]: """ Run bounded optimal-control shot and return deterministic summary. """ seed_int = int(seed) if config_file is None: repo_root = Path(__file__).resolve().parents[3] config_file = str(repo_root / "iter_config.json") pilot = OptimalController( str(config_file), kernel_factory=kernel_factory, verbose=verbose, coil_current_limits=coil_current_limits, current_target_limits=current_target_limits, ) pilot.identify_system() summary = pilot.run_optimal_shot( shot_steps=shot_steps, target_r=target_r, target_z=target_z, save_plot=save_plot, output_path=output_path, ) summary["seed"] = seed_int summary["config_path"] = str(config_file) return summary
if __name__ == "__main__": run_optimal_control()