# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Fusion Kernel
"""
Non-linear free-boundary Grad-Shafranov equilibrium solver.
Solves the Grad-Shafranov equation for toroidal plasma equilibrium using
Picard iteration with under-relaxation. Supports both L-mode (linear)
and H-mode (mtanh pedestal) pressure/current profiles.
The solver can optionally offload the inner elliptic solve to a compiled
C++ library via :class:`~scpn_fusion.hpc.hpc_bridge.HPCBridge`, or to
the Rust multigrid backend when available.
"""
from __future__ import annotations
from typing import Any
import logging
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
from numpy.typing import NDArray
from scipy.special import ellipe, ellipk
from scpn_fusion.core.config_schema import validate_config
from scpn_fusion.core.fusion_kernel_free_boundary_mixin import FusionKernelFreeBoundaryMixin
from scpn_fusion.core.fusion_kernel_iterative_solver import FusionKernelIterativeSolverMixin
from scpn_fusion.core.fusion_kernel_newton_solver import FusionKernelNewtonSolverMixin
from scpn_fusion.core.fusion_kernel_numerics import (
NUMERIC_SANITIZE_CAP as _NUMERIC_SANITIZE_CAP,
FloatArray,
sanitize_numeric_array as _sanitize_numeric_array_impl,
stable_rms as _stable_rms_impl,
)
from scpn_fusion.hpc.hpc_bridge import HPCBridge
from scpn_fusion.io.safe_loaders import checked_json_load
logger = logging.getLogger(__name__)
MAX_CONFIG_BYTES = 10 * 1024 * 1024
def _sanitize_numeric_array(arr: FloatArray, *, cap: float = _NUMERIC_SANITIZE_CAP) -> FloatArray:
"""Return finite array with values clipped to ``[-cap, cap]``."""
return _sanitize_numeric_array_impl(arr, cap=cap)
def _stable_rms(arr: FloatArray) -> float:
"""Compute RMS without overflow from squaring very large magnitudes."""
return _stable_rms_impl(arr)
[docs]
@dataclass
class CoilSet:
"""External coil set for free-boundary solve.
Attributes
----------
positions : list of (R, Z) tuples
Coil centre coordinates [m].
currents : NDArray
Current per coil [A].
turns : list of int
Number of turns per coil.
current_limits : NDArray or None
Per-coil maximum absolute current [A]. Shape ``(n_coils,)``.
When set, ``optimize_coil_currents`` enforces these bounds.
target_flux_points : NDArray or None
Points ``(R, Z)`` on the desired separatrix for shape optimisation.
Shape ``(n_pts, 2)``.
target_flux_values : NDArray or None
Optional target flux values [Wb] at ``target_flux_points``.
Shape ``(n_pts,)``. When omitted, ``solve_free_boundary`` uses an
isoflux target inferred from current interpolation.
x_point_target : NDArray or None
Target X-point position ``(R, Z)`` [m]. Shape ``(2,)``.
x_point_flux_target : float or None
Target flux value [Wb] at the X-point.
divertor_strike_points : NDArray or None
Divertor strike-point positions ``(R, Z)`` [m]. Shape ``(n, 2)``.
divertor_flux_values : NDArray or None
Target flux values [Wb] at divertor strike points. Shape ``(n,)``.
"""
positions: list[tuple[float, float]] = field(default_factory=list)
currents: NDArray[np.float64] = field(default_factory=lambda: np.array([]))
turns: list[int] = field(default_factory=list)
current_limits: NDArray[np.float64] | None = None
target_flux_points: NDArray[np.float64] | None = None
target_flux_values: NDArray[np.float64] | None = None
x_point_target: NDArray[np.float64] | None = None
x_point_flux_target: float | None = None
divertor_strike_points: NDArray[np.float64] | None = None
divertor_flux_values: NDArray[np.float64] | None = None
[docs]
class FusionKernel(
FusionKernelFreeBoundaryMixin,
FusionKernelNewtonSolverMixin,
FusionKernelIterativeSolverMixin,
):
"""Non-linear free-boundary Grad-Shafranov equilibrium solver.
Parameters
----------
config_path : str | Path
Path to a JSON configuration file describing the reactor geometry,
coil set, physics parameters and solver settings.
Attributes
----------
Psi : FloatArray
Poloidal flux on the (Z, R) grid.
J_phi : FloatArray
Toroidal current density on the (Z, R) grid.
B_R, B_Z : FloatArray
Radial and vertical magnetic field components (set after solve).
"""
# ── construction ──────────────────────────────────────────────────
def __init__(self, config_path: str | Path) -> None:
self._config_path = str(config_path)
self.load_config(config_path)
self.initialize_grid()
self.setup_accelerator()
[docs]
def load_config(self, path: str | Path) -> None:
"""Load reactor configuration from a JSON file.
Parameters
----------
path : str | Path
Filesystem path to the configuration JSON.
"""
config_path = Path(path)
size = config_path.stat().st_size
if size > MAX_CONFIG_BYTES:
raise ValueError(
f"configuration file exceeds {MAX_CONFIG_BYTES} byte limit: {config_path}"
)
raw_cfg = checked_json_load(config_path, max_bytes=MAX_CONFIG_BYTES)
# Hardening: Strict schema validation at the entry point
validated_cfg = validate_config(raw_cfg)
self.cfg = validated_cfg.model_dump()
logger.info("Loaded configuration for: %s", self.cfg["reactor_name"])
[docs]
def initialize_grid(self) -> None:
"""Build the computational (R, Z) mesh from the loaded config."""
dims = self.cfg["dimensions"]
res = self.cfg["grid_resolution"]
self.NR: int = res[0]
self.NZ: int = res[1]
self.R: FloatArray = np.linspace(dims["R_min"], dims["R_max"], self.NR)
self.Z: FloatArray = np.linspace(dims["Z_min"], dims["Z_max"], self.NZ)
self.dR: float = float(self.R[1] - self.R[0])
self.dZ: float = float(self.Z[1] - self.Z[0])
self.RR: FloatArray
self.ZZ: FloatArray
self.RR, self.ZZ = np.meshgrid(self.R, self.Z)
self.Psi: FloatArray = np.zeros((self.NZ, self.NR))
self.J_phi: FloatArray = np.zeros((self.NZ, self.NR))
self.p_prime_0: float = -1.0
self.ff_prime_0: float = -1.0
# Profile mode
self.profile_mode: str = "l-mode"
self.ped_params_p: dict[str, float] = {
"ped_top": 0.92,
"ped_width": 0.05,
"ped_height": 1.0,
"core_alpha": 0.3,
}
self.ped_params_ff: dict[str, float] = {
"ped_top": 0.92,
"ped_width": 0.05,
"ped_height": 1.0,
"core_alpha": 0.3,
}
profiles_cfg = self.cfg.get("physics", {}).get("profiles")
if profiles_cfg:
self.profile_mode = profiles_cfg.get("mode", "l-mode")
if "p_prime" in profiles_cfg:
self.ped_params_p.update(profiles_cfg["p_prime"])
if "ff_prime" in profiles_cfg:
self.ped_params_ff.update(profiles_cfg["ff_prime"])
[docs]
def setup_accelerator(self) -> None:
"""Initialise the optional C++ HPC acceleration bridge."""
self.hpc = HPCBridge()
if self.hpc.is_available():
logger.info("HPC Acceleration ENABLED.")
self.hpc.initialize(
self.NR,
self.NZ,
(self.R[0], self.R[-1]),
(self.Z[0], self.Z[-1]),
)
else:
logger.info("HPC Acceleration UNAVAILABLE (using Python compatibility backend).")
# ── vacuum field ──────────────────────────────────────────────────
[docs]
def calculate_vacuum_field(self) -> FloatArray:
"""Compute the vacuum poloidal flux from the external coil set.
Uses elliptic integrals (toroidal Green's function) for each coil.
Returns
-------
FloatArray
Vacuum flux Psi_vac on the (NZ, NR) grid.
"""
logger.debug("Computing vacuum field (toroidal exact)…")
Psi_vac = np.zeros((self.NZ, self.NR))
mu0: float = self.cfg["physics"].get("vacuum_permeability", 1.0)
for coil in self.cfg["coils"]:
Rc, Zc = coil["r"], coil["z"]
current = coil["current"]
dZ = self.ZZ - Zc
R_plus_Rc_sq = (self.RR + Rc) ** 2
k2 = (4.0 * self.RR * Rc) / (R_plus_Rc_sq + dZ**2)
k2 = np.clip(k2, 1e-12, 1.0 - 1e-12)
K = ellipk(k2)
E = ellipe(k2)
prefactor = (mu0 * current) / (2.0 * np.pi)
sqrt_term = np.sqrt(self.RR * Rc)
k = np.sqrt(k2)
term = ((2.0 - k2) * K - 2.0 * E) / k
Psi_vac += prefactor * sqrt_term * term
return Psi_vac
# ── topology analysis ─────────────────────────────────────────────
[docs]
def find_x_point(self, Psi: FloatArray) -> tuple[tuple[float, float], float]:
"""Locate the X-point (magnetic null) in the lower divertor region.
Parameters
----------
Psi : FloatArray
Poloidal flux array on the (NZ, NR) grid.
Returns
-------
tuple[tuple[float, float], float]
``((R_x, Z_x), Psi_x)`` — position and flux value at the
X-point.
"""
psi_raw = np.asarray(Psi, dtype=np.float64)
finite_mask = np.isfinite(psi_raw)
if not np.any(finite_mask):
return (0.0, 0.0), 0.0
psi_safe = np.nan_to_num(psi_raw, nan=0.0, posinf=1e300, neginf=-1e300)
# Psi is indexed (Z, R): axis-0 = Z, axis-1 = R
dPsi_dZ, dPsi_dR = np.gradient(psi_safe, self.dZ, self.dR)
B_mag = np.hypot(dPsi_dR, dPsi_dZ)
mask_divertor = (self.cfg["dimensions"]["Z_min"] * 0.5) > self.ZZ
if np.any(mask_divertor):
masked_B = np.where(mask_divertor, B_mag, np.inf)
finite_count = int(np.isfinite(masked_B).sum())
if finite_count > 0:
use_saddle_detection = bool(
self.cfg.get("solver", {}).get("xpoint_use_saddle_detection", False)
)
if not use_saddle_detection:
idx_min = int(np.argmin(masked_B))
iz, ir = np.unravel_index(idx_min, Psi.shape)
psi_x = float(psi_raw[iz, ir])
if not np.isfinite(psi_x):
psi_x = float(psi_safe[iz, ir])
return (float(self.R[ir]), float(self.Z[iz])), psi_x
# Prefer true magnetic saddles over pure |grad(Psi)| minima.
n_candidates = min(16, finite_count)
flat = masked_B.ravel()
candidate_idx = np.argpartition(flat, n_candidates - 1)[:n_candidates]
saddle_hits: list[tuple[float, int, int]] = []
nz, nr = Psi.shape
for idx in candidate_idx:
iz, ir = np.unravel_index(int(idx), Psi.shape)
if iz <= 0 or iz >= nz - 1 or ir <= 0 or ir >= nr - 1:
continue
d2R = (psi_safe[iz, ir + 1] - 2.0 * psi_safe[iz, ir] + psi_safe[iz, ir - 1]) / (
self.dR**2
)
d2Z = (psi_safe[iz + 1, ir] - 2.0 * psi_safe[iz, ir] + psi_safe[iz - 1, ir]) / (
self.dZ**2
)
dRZ = (
psi_safe[iz + 1, ir + 1]
- psi_safe[iz + 1, ir - 1]
- psi_safe[iz - 1, ir + 1]
+ psi_safe[iz - 1, ir - 1]
) / (4.0 * self.dR * self.dZ)
det_hessian = float(d2R * d2Z - dRZ * dRZ)
if np.isfinite(det_hessian) and det_hessian < 0.0:
saddle_hits.append((float(masked_B[iz, ir]), int(iz), int(ir)))
if saddle_hits:
_, iz_best, ir_best = min(saddle_hits, key=lambda x: x[0])
psi_x = float(psi_raw[iz_best, ir_best])
if not np.isfinite(psi_x):
psi_x = float(psi_safe[iz_best, ir_best])
return (
(float(self.R[ir_best]), float(self.Z[iz_best])),
psi_x,
)
# Fallback to minimum-gradient candidate when saddle test fails.
idx_min = int(np.argmin(masked_B))
iz, ir = np.unravel_index(idx_min, Psi.shape)
psi_x = float(psi_raw[iz, ir])
if not np.isfinite(psi_x):
psi_x = float(psi_safe[iz, ir])
return (float(self.R[ir]), float(self.Z[iz])), psi_x
psi_min = float(np.min(psi_raw[finite_mask]))
return (0.0, 0.0), psi_min
def _find_magnetic_axis(self) -> tuple[int, int, float]:
"""Find the O-point (magnetic axis) as the global Psi maximum.
Returns
-------
tuple[int, int, float]
``(iz, ir, Psi_axis)`` — grid indices and flux value.
"""
idx_max = int(np.argmax(self.Psi))
iz, ir = np.unravel_index(idx_max, self.Psi.shape)
psi_axis = float(self.Psi[iz, ir])
if abs(psi_axis) < 1e-6:
psi_axis = 1e-6
return int(iz), int(ir), psi_axis
# ── profile functions ─────────────────────────────────────────────
[docs]
@staticmethod
def mtanh_profile(psi_norm: FloatArray, params: dict[str, float]) -> FloatArray:
"""Evaluate a modified-tanh pedestal profile (vectorised).
Parameters
----------
psi_norm : FloatArray
Normalised poloidal flux (0 at axis, 1 at separatrix).
params : dict[str, float]
Profile shape parameters with keys ``ped_top``, ``ped_width``,
``ped_height``, ``core_alpha``.
Returns
-------
FloatArray
Profile value; zero outside the plasma region.
"""
result = np.zeros_like(psi_norm)
mask = (psi_norm >= 0) & (psi_norm < 1.0)
x = psi_norm[mask]
y = np.clip((params["ped_top"] - x) / params["ped_width"], -20, 20)
pedestal = 0.5 * params["ped_height"] * (1.0 + np.tanh(y))
core = np.where(
x < params["ped_top"],
np.maximum(0.0, 1.0 - (x / params["ped_top"]) ** 2),
0.0,
)
result[mask] = pedestal + params["core_alpha"] * core
return result
# ── source term ───────────────────────────────────────────────────
[docs]
def update_plasma_source_nonlinear(self, Psi_axis: float, Psi_boundary: float) -> FloatArray:
"""Compute the toroidal current density J_phi from the GS source.
Uses ``J_phi = R p'(psi) + FF'(psi) / (mu0 R)`` with either
L-mode (linear) or H-mode (mtanh) profiles, then renormalises to
match the target plasma current.
Parameters
----------
Psi_axis : float
Poloidal flux at the magnetic axis (O-point).
Psi_boundary : float
Poloidal flux at the separatrix (X-point or limiter).
Returns
-------
FloatArray
Updated ``J_phi`` on the (NZ, NR) grid.
"""
mu0: float = self.cfg["physics"]["vacuum_permeability"]
denom = Psi_boundary - Psi_axis
if abs(denom) < 1e-9:
denom = 1e-9
Psi_norm = (self.Psi - Psi_axis) / denom
mask_plasma = (Psi_norm >= 0) & (Psi_norm < 1.0)
if self.profile_mode in ("h-mode", "H-mode", "hmode"):
p_profile = self.mtanh_profile(Psi_norm, self.ped_params_p)
ff_profile = self.mtanh_profile(Psi_norm, self.ped_params_ff)
else:
p_profile = np.zeros_like(self.Psi)
p_profile[mask_plasma] = 1.0 - Psi_norm[mask_plasma]
ff_profile = p_profile.copy()
J_p = self.RR * p_profile
J_f = (1.0 / (mu0 * self.RR)) * ff_profile
beta_mix = 0.5
J_raw = beta_mix * J_p + (1 - beta_mix) * J_f
I_current = float(np.sum(J_raw)) * self.dR * self.dZ
I_target: float = self.cfg["physics"]["plasma_current_target"]
if abs(I_current) > 1e-9:
self.J_phi = J_raw * (I_target / I_current)
else:
self.J_phi = np.zeros_like(self.Psi)
return self.J_phi
# Iterative and Newton solver internals moved to dedicated mixins.
# ── post-processing ───────────────────────────────────────────────
[docs]
def compute_b_field(self) -> None:
"""Derive the magnetic field components from the solved Psi."""
# Psi is indexed (Z, R): axis-0 = Z, axis-1 = R
dPsi_dZ, dPsi_dR = np.gradient(self.Psi, self.dZ, self.dR)
R_safe = np.maximum(self.RR, 1e-6)
self.B_R = -(1.0 / R_safe) * dPsi_dZ
self.B_Z = (1.0 / R_safe) * dPsi_dR
[docs]
def save_results(self, filename: str = "equilibrium_nonlinear.npz") -> None:
"""Save the equilibrium state to a compressed NumPy archive.
Parameters
----------
filename : str
Output file path.
"""
np.savez(filename, R=self.R, Z=self.Z, Psi=self.Psi, J_phi=self.J_phi)
logger.info("Saved: %s", filename)
# ── phase synchronisation ─────────────────────────────────────────
[docs]
def phase_sync_step(
self,
theta: NDArray[np.float64],
omega: NDArray[np.float64],
dt: float,
psi_driver: float | None = None,
**kwargs: Any,
) -> dict[str, Any]:
"""Evolve oscillator phases using the Kuramoto-Sakaguchi engine."""
from scpn_fusion.phase.kuramoto import kuramoto_sakaguchi_step
# Merge configuration defaults with kwargs.
phase_cfg = self.cfg.get("phase_sync", {})
K = kwargs.pop("K", phase_cfg.get("K", 1.0))
zeta = kwargs.pop("zeta", phase_cfg.get("zeta", 0.0))
psi_mode = kwargs.pop("psi_mode", phase_cfg.get("psi_mode", "external"))
return kuramoto_sakaguchi_step(
theta=theta,
omega=omega,
dt=dt,
K=K,
zeta=zeta,
psi_driver=psi_driver,
psi_mode=psi_mode,
**kwargs,
)
[docs]
def phase_sync_step_lyapunov(
self,
theta: NDArray[np.float64],
omega: NDArray[np.float64],
n_steps: int,
dt: float,
psi_driver: float | None = None,
**kwargs: Any,
) -> dict[str, Any]:
"""Run a batch of phase sync steps and return Lyapunov stability diagnostics."""
from scpn_fusion.phase.kuramoto import lyapunov_exponent, lyapunov_v
th = theta.copy()
R_hist = []
V_hist = []
for _ in range(n_steps):
out = self.phase_sync_step(th, omega, dt=dt, psi_driver=psi_driver, **kwargs)
th = out["theta1"]
R_hist.append(out["R"])
V_hist.append(lyapunov_v(th, out["Psi"]))
lam = lyapunov_exponent(V_hist, dt)
return {
"theta_final": th,
"R_hist": np.array(R_hist),
"V_hist": np.array(V_hist),
"lambda": lam,
"stable": bool(lam < 0.0),
}
# ── FRC bridge ────────────────────────────────────────────────────
[docs]
def initialize_from_frc(self, frc_state: Any, kappa: float | None = None) -> None:
"""Interpolate a 1D FRC equilibrium onto the 2D Grad-Shafranov grid.
Parameters
----------
frc_state : FRCEquilibriumState
The radial equilibrium state solved by the rigid-rotor engine.
kappa : float | None
Separatrix elongation (Z/R). Falls back to config target if omitted.
"""
rho = np.asarray(frc_state.rho, dtype=float)
psi = np.asarray(frc_state.psi, dtype=float)
j_theta = np.asarray(frc_state.J_theta, dtype=float)
if rho.ndim != 1 or psi.shape != rho.shape or j_theta.shape != rho.shape:
raise ValueError("frc_state rho, psi, and J_theta must be same-length 1D arrays")
if rho.size < 2 or not np.all(np.isfinite(rho)):
raise ValueError("frc_state.rho must contain at least two finite samples")
if not np.all(np.diff(rho) > 0.0):
raise ValueError("frc_state.rho must be strictly increasing")
if not np.all(np.isfinite(psi)) or not np.all(np.isfinite(j_theta)):
raise ValueError("frc_state psi and J_theta must be finite")
a = float(frc_state.target_separatrix_radius_m)
if not np.isfinite(a) or a <= 0.0:
raise ValueError("frc_state.target_separatrix_radius_m must be finite and positive")
if kappa is None:
kappa = self.cfg.get("target", {}).get("kappa") or 2.0
kappa = float(kappa)
if not np.isfinite(kappa) or kappa <= 0.0:
raise ValueError("kappa must be finite and positive")
# Center the FRC at (R_center, 0.0)
# We use the midpoint of the computational R domain.
r_center = float(0.5 * (self.R[0] + self.R[-1]))
# Relative coordinates
dR_2d = self.RR - r_center
dZ_2d = self.ZZ
# Axial Gaussian fall-off
# Characteristic length z_scale = a * kappa
z_scale = max(a * kappa, 1e-6)
axial_envelope = np.exp(-((dZ_2d / z_scale) ** 2))
# Radial interpolation
# frc_state.rho is distance from O-point (rho=0).
r_dist = np.abs(dR_2d)
psi_radial = np.interp(r_dist, rho, psi, left=float(psi[0]), right=float(psi[-1]))
j_radial = np.interp(r_dist, rho, j_theta, left=float(j_theta[0]), right=0.0)
j_radial = np.where(r_dist >= rho[-1], 0.0, j_radial)
self.Psi = psi_radial * axial_envelope
self.J_phi = j_radial * axial_envelope
# Match target Ip to the interpolated integral
I_total = float(np.sum(self.J_phi)) * self.dR * self.dZ
self.cfg["physics"]["plasma_current_target"] = I_total
target_cfg = self.cfg.setdefault("target", {})
target_cfg["kappa"] = kappa
target_cfg["frc_major_radius_m"] = r_center
target_cfg["frc_separatrix_radius_m"] = a
target_cfg["frc_radial_support_m"] = float(rho[-1])
logger.info(
"Initialized 2D grid from 1D FRC (Ip=%.3f MA, kappa=%.2f)", I_total / 1e6, kappa
)
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO, format="%(name)s %(message)s")
config_file = sys.argv[1] if len(sys.argv) > 1 else "iter_config.json"
fk = FusionKernel(config_file)
fk.solve_equilibrium()
fk.save_results("final_state_nonlinear.npz")