# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Neural Equilibrium Accelerator
"""
PCA + MLP surrogate for Grad-Shafranov equilibrium reconstruction.
Maps coil currents (or profile parameters) → PCA coefficients → ψ(R,Z)
with ~1000× speedup over the full Picard iteration.
**Training modes:**
1. **From FusionKernel** — Generate training data by perturbing coil
currents and running the GS solver. Requires a valid config JSON.
2. **From SPARC GEQDSKs** — Train on real equilibrium data from CFS.
Uses the GEQDSK's profile parameters (p', FF', I_p) as input features
and ψ(R,Z) as targets. No coil model needed.
**Status:** Reduced-order surrogate. Not on the critical control path.
Use for rapid design-space exploration and batch equilibrium sweeps.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from pathlib import Path
import numpy as np
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
REPO_ROOT = Path(__file__).resolve().parents[3]
DEFAULT_WEIGHTS_PATH = REPO_ROOT / "weights" / "neural_equilibrium_sparc.npz"
# ── Data containers ──────────────────────────────────────────────────
[docs]
@dataclass
class NeuralEqConfig:
"""Configuration for the neural equilibrium model."""
n_components: int = 20
hidden_sizes: tuple[int, ...] = (128, 64, 32)
n_input_features: int = 12
grid_shape: tuple[int, int] = (129, 129) # (nh, nw)
lambda_gs: float = 0.1
[docs]
@dataclass
class TrainingResult:
"""Training summary."""
n_samples: int
n_components: int
explained_variance: float
final_loss: float
train_time_s: float
weights_path: str
val_loss: float = float("nan")
test_mse: float = float("nan")
test_max_error: float = float("nan")
# ── Simple MLP (pure NumPy) ──────────────────────────────────────────
[docs]
class SimpleMLP:
"""Feedforward MLP with ReLU hidden layers and linear output."""
def __init__(self, layer_sizes: list[int], seed: int = 42) -> None:
self.rng = np.random.default_rng(seed)
self.weights: list[NDArray] = []
self.biases: list[NDArray] = []
for i in range(len(layer_sizes) - 1):
fan_in = layer_sizes[i]
# He initialisation
scale = np.sqrt(2.0 / fan_in)
self.weights.append(self.rng.normal(0, scale, (layer_sizes[i], layer_sizes[i + 1])))
self.biases.append(np.zeros(layer_sizes[i + 1]))
[docs]
def forward(self, x: NDArray) -> NDArray:
h = x
for i, (W, b) in enumerate(zip(self.weights, self.biases)):
h = h @ W + b
if i < len(self.weights) - 1:
h = np.maximum(0, h) # ReLU
return h
[docs]
def predict(self, x: NDArray) -> NDArray:
return self.forward(x)
# ── PCA (minimal, no sklearn dependency) ─────────────────────────────
[docs]
class MinimalPCA:
"""Minimal PCA via SVD, no sklearn required."""
def __init__(self, n_components: int = 20) -> None:
self.n_components = n_components
self.mean_: NDArray | None = None
self.components_: NDArray | None = None
self.explained_variance_ratio_: NDArray | None = None
[docs]
def fit(self, X: NDArray) -> "MinimalPCA":
self.mean_ = X.mean(axis=0)
X_centered = X - self.mean_
U, S, Vt = np.linalg.svd(X_centered, full_matrices=False)
total_var = (S**2).sum()
self.components_ = Vt[: self.n_components]
self.explained_variance_ratio_ = S[: self.n_components] ** 2 / max(total_var, 1e-15)
return self
# ── Neural Equilibrium Accelerator ───────────────────────────────────
[docs]
class NeuralEquilibriumAccelerator:
"""
PCA + MLP surrogate for Grad-Shafranov equilibrium.
Can be trained from SPARC GEQDSK files (preferred) or from a
FusionKernel config with coil perturbations.
"""
def __init__(self, config: NeuralEqConfig | None = None) -> None:
self.cfg = config or NeuralEqConfig()
self.pca = MinimalPCA(n_components=self.cfg.n_components)
self.mlp: SimpleMLP | None = None
self.is_trained = False
self._input_mean: NDArray | None = None
self._input_std: NDArray | None = None
self._psi_normalized: bool = False
# ── GS residual loss ───────────────────────────────────────────
def _gs_residual_loss(self, psi_pred_flat: NDArray, grid_shape: tuple[int, int]) -> float:
"""GS residual loss: penalizes Laplacian of predicted psi."""
nh, nw = grid_shape
psi = psi_pred_flat.reshape(nh, nw)
lap = np.zeros_like(psi)
lap[1:-1, 1:-1] = (
psi[2:, 1:-1] + psi[:-2, 1:-1] + psi[1:-1, 2:] + psi[1:-1, :-2] - 4 * psi[1:-1, 1:-1]
)
return float(np.mean(lap[1:-1, 1:-1] ** 2))
# ── Evaluation ────────────────────────────────────────────────
[docs]
def evaluate_surrogate(self, X_test: NDArray, Y_test_raw: NDArray) -> dict[str, float]:
"""Evaluate on test set. Returns dict with mse, max_error, gs_residual."""
if not self.is_trained:
raise RuntimeError("Not trained")
if self._input_mean is None or self._input_std is None:
raise RuntimeError("Input normalization statistics are unavailable.")
if self.mlp is None:
raise RuntimeError("MLP weights are unavailable.")
x_norm = (X_test - self._input_mean) / self._input_std
coeffs = self.mlp.predict(x_norm)
psi_pred = self.pca.inverse_transform(coeffs)
mse = float(np.mean((psi_pred - Y_test_raw) ** 2))
max_err = float(np.max(np.abs(psi_pred - Y_test_raw)))
gs_res = 0.0
for row in range(len(psi_pred)):
gs_res += self._gs_residual_loss(psi_pred[row], self.cfg.grid_shape)
gs_res /= max(len(psi_pred), 1)
return {"mse": mse, "max_error": max_err, "gs_residual": gs_res}
# ── Training from SPARC GEQDSKs ─────────────────────────────────
[docs]
def train_from_geqdsk(
self,
geqdsk_paths: list[Path],
n_perturbations: int = 25,
seed: int = 42,
) -> TrainingResult:
"""
Train on real SPARC GEQDSK equilibria with perturbations.
For each GEQDSK file, generates n_perturbations by scaling p'/FF'
profiles, yielding n_files × n_perturbations training pairs.
Input features (12-dim):
[I_p, B_t, R_axis, Z_axis, pprime_scale, ffprime_scale,
simag, sibry, kappa, delta_upper, delta_lower, q95]
Output: flattened ψ(R,Z) → PCA coefficients
Uses 70/15/15 train/val/test split with val-loss early stopping
(patience=20) and combined MSE + lambda_gs * GS residual loss.
"""
from scpn_fusion.core.eqdsk import read_geqdsk
rng = np.random.default_rng(seed)
t0 = time.perf_counter()
X_features: list[NDArray] = []
Y_psi: list[NDArray] = []
# Target grid: use the first file's grid as reference
first_eq = read_geqdsk(geqdsk_paths[0])
target_nh, target_nw = first_eq.nh, first_eq.nw
self.cfg.grid_shape = (target_nh, target_nw)
for path in geqdsk_paths:
eq = read_geqdsk(path)
# Interpolate onto target grid if needed
if eq.nh != target_nh or eq.nw != target_nw:
from scipy.interpolate import RectBivariateSpline
spline = RectBivariateSpline(eq.z, eq.r, eq.psirz, kx=3, ky=3)
target_r = np.linspace(eq.rleft, eq.rleft + eq.rdim, target_nw)
target_z = np.linspace(eq.zmid - eq.zdim / 2, eq.zmid + eq.zdim / 2, target_nh)
psi_interp = spline(target_z, target_r, grid=True)
else:
psi_interp = eq.psirz
# Extract shape parameters from boundary if available
kappa = 1.7 # default elongation
delta_upper = 0.3 # default upper triangularity
delta_lower = 0.3 # default lower triangularity
q95 = 3.0 # default safety factor at 95% flux
if hasattr(eq, "rbbbs") and eq.rbbbs is not None and len(eq.rbbbs) > 3:
r_span = eq.rbbbs.max() - eq.rbbbs.min()
kappa = (eq.zbbbs.max() - eq.zbbbs.min()) / max(r_span, 0.01)
if hasattr(eq, "qpsi") and eq.qpsi is not None and len(eq.qpsi) > 0:
idx_95 = int(0.95 * len(eq.qpsi))
q95 = eq.qpsi[min(idx_95, len(eq.qpsi) - 1)]
# Base feature vector (12-dim)
base_features = np.array(
[
eq.current / 1e6, # I_p in MA
eq.bcentr, # B_t in T
eq.rmaxis, # R_axis in m
eq.zmaxis, # Z_axis in m
1.0, # pprime scale factor
1.0, # ffprime scale factor
eq.simag, # psi at axis
eq.sibry, # psi at boundary
kappa, # elongation
delta_upper, # upper triangularity
delta_lower, # lower triangularity
q95, # safety factor at 95% flux
]
)
# Unperturbed sample
X_features.append(base_features)
Y_psi.append(psi_interp.ravel())
# Perturbed samples: scale p'/FF' and blend psi
for _ in range(n_perturbations):
pp_scale = rng.uniform(0.7, 1.3)
ff_scale = rng.uniform(0.7, 1.3)
# Perturbed features (shape params stay at base values)
feat = base_features.copy()
feat[4] = pp_scale
feat[5] = ff_scale
# kappa/delta/q95 at indices 8-11 are inherited from base
# Linearly blend psi with a scale-dependent offset
# This simulates the effect of profile scaling on equilibrium
denom = eq.sibry - eq.simag
if abs(denom) < 1e-12:
denom = 1.0
psi_n = (psi_interp - eq.simag) / denom
# Profile perturbation modifies the normalised psi shape
mix = 0.5 * (pp_scale + ff_scale) - 1.0 # deviation from 1.0
# Perturb interior normalised psi
plasma_mask = (psi_n >= 0) & (psi_n < 1.0)
psi_perturbed = psi_interp.copy()
psi_perturbed[plasma_mask] += mix * 0.1 * denom * (1.0 - psi_n[plasma_mask])
X_features.append(feat)
Y_psi.append(psi_perturbed.ravel())
X = np.array(X_features)
Y = np.array(Y_psi)
n_samples = len(X)
logger.info(
"Training data: %d samples, %d features → %d outputs", n_samples, X.shape[1], Y.shape[1]
)
# Normalise inputs
self._input_mean = X.mean(axis=0)
self._input_std = X.std(axis=0)
self._input_std[self._input_std < 1e-10] = 1.0
X_norm = (X - self._input_mean) / self._input_std
# PCA on flattened psi
Y_compressed = self.pca.fit_transform(Y)
explained = float(np.sum(self.pca.explained_variance_ratio_))
logger.info(
"PCA: %d → %d components, %.2f%% variance retained",
Y.shape[1],
self.cfg.n_components,
explained * 100,
)
# ── Train/val/test split (70/15/15) ────────────────────────
indices = rng.permutation(n_samples)
n_train = int(0.70 * n_samples)
n_val = int(0.15 * n_samples)
train_idx = indices[:n_train]
val_idx = indices[n_train : n_train + n_val]
test_idx = indices[n_train + n_val :]
X_train, Y_train = X_norm[train_idx], Y_compressed[train_idx]
X_val, Y_val = X_norm[val_idx], Y_compressed[val_idx]
# Keep uncompressed targets for GS residual evaluation
Y_test_raw = Y[test_idx]
logger.info(
"Split: %d train / %d val / %d test",
len(train_idx),
len(val_idx),
len(test_idx),
)
# ── Train MLP: X_train → Y_train ─────────────────────────
self.cfg.n_input_features = X.shape[1]
layer_sizes = [
self.cfg.n_input_features,
*self.cfg.hidden_sizes,
self.cfg.n_components,
]
self.mlp = SimpleMLP(layer_sizes, seed=seed)
# Mini-batch SGD with momentum
lr = 1e-3
momentum = 0.9
n_epochs = 500
batch_size = min(32, len(X_train))
velocity = [np.zeros_like(w) for w in self.mlp.weights]
velocity_b = [np.zeros_like(b) for b in self.mlp.biases]
best_val_loss = float("inf")
best_train_loss = float("inf")
patience_counter = 0
patience = 20
for epoch in range(n_epochs):
order = rng.permutation(len(X_train))
epoch_loss = 0.0
for start in range(0, len(X_train), batch_size):
idx = order[start : start + batch_size]
x_batch = X_train[idx]
y_batch = Y_train[idx]
# Forward pass (store activations for backprop)
activations = [x_batch]
h = x_batch
for i, (W, b) in enumerate(zip(self.mlp.weights, self.mlp.biases)):
z = h @ W + b
if i < len(self.mlp.weights) - 1:
h = np.maximum(0, z)
else:
h = z
activations.append(h)
# MSE loss
error = activations[-1] - y_batch
loss = float(np.mean(error**2))
# GS residual loss on this batch
gs_loss = 0.0
for row in range(len(idx)):
psi_pred_flat = self.pca.inverse_transform(activations[-1][row : row + 1])[0]
gs_loss += self._gs_residual_loss(psi_pred_flat, self.cfg.grid_shape)
gs_loss /= len(idx)
loss = loss + self.cfg.lambda_gs * gs_loss
epoch_loss += loss * len(idx)
# Backprop (on MSE part; GS is treated as a monitoring
# penalty — its gradient flows implicitly through the
# PCA reconstruction but we approximate with MSE grads
# to keep pure-NumPy backprop tractable)
delta = 2.0 * error / len(idx)
for i in range(len(self.mlp.weights) - 1, -1, -1):
grad_w = activations[i].T @ delta
grad_b = delta.sum(axis=0)
grad_norm = np.linalg.norm(grad_w)
if grad_norm > 1.0:
grad_w *= 1.0 / grad_norm
grad_b *= 1.0 / grad_norm
velocity[i] = momentum * velocity[i] - lr * grad_w
velocity_b[i] = momentum * velocity_b[i] - lr * grad_b
self.mlp.weights[i] += velocity[i]
self.mlp.biases[i] += velocity_b[i]
if i > 0:
delta = delta @ self.mlp.weights[i].T
# ReLU derivative
delta *= (activations[i] > 0).astype(float)
epoch_loss /= max(len(X_train), 1)
if epoch_loss < best_train_loss:
best_train_loss = epoch_loss
# ── Validation loss (MSE + lambda_gs * GS) ────────────
val_pred = self.mlp.forward(X_val)
val_mse = float(np.mean((val_pred - Y_val) ** 2))
val_gs = 0.0
for row in range(len(X_val)):
psi_pred_flat = self.pca.inverse_transform(val_pred[row : row + 1])[0]
val_gs += self._gs_residual_loss(psi_pred_flat, self.cfg.grid_shape)
val_gs /= max(len(X_val), 1)
val_loss = val_mse + self.cfg.lambda_gs * val_gs
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
logger.info(
"Early stopping at epoch %d (val_loss=%.6f)",
epoch,
val_loss,
)
break
if epoch % 50 == 0:
logger.info(
"Epoch %d: train_loss=%.6f val_loss=%.6f",
epoch,
epoch_loss,
val_loss,
)
self.is_trained = True
train_time = time.perf_counter() - t0
# ── Evaluate on held-out test set ─────────────────────────
test_metrics = self.evaluate_surrogate(
X[test_idx],
Y_test_raw,
)
logger.info(
"Test set: MSE=%.6f max_error=%.6f GS_residual=%.6f",
test_metrics["mse"],
test_metrics["max_error"],
test_metrics["gs_residual"],
)
return TrainingResult(
n_samples=n_samples,
n_components=self.cfg.n_components,
explained_variance=explained,
final_loss=best_train_loss,
train_time_s=train_time,
weights_path="",
val_loss=best_val_loss,
test_mse=test_metrics["mse"],
test_max_error=test_metrics["max_error"],
)
# ── Inference ────────────────────────────────────────────────────
[docs]
def predict(self, features: NDArray) -> NDArray:
"""
Predict ψ(R,Z) from input features.
Parameters
----------
features : NDArray
Shape (n_features,) or (batch, n_features).
Returns
-------
NDArray
Shape (nh, nw) or (batch, nh, nw).
"""
if not self.is_trained:
raise RuntimeError(
"Model not trained. Call train_from_geqdsk() or load_weights() first."
)
if features.ndim == 1:
features = features[np.newaxis, :]
if self._input_mean is None or self._input_std is None:
raise RuntimeError("Input normalization statistics are unavailable.")
if self.mlp is None:
raise RuntimeError("MLP weights are unavailable.")
x_norm = (features - self._input_mean) / self._input_std
coeffs = self.mlp.predict(x_norm)
psi_flat = self.pca.inverse_transform(coeffs)
# Denormalise if model was trained on normalised psi.
# simag at feature index 6, sibry at index 7.
if self._psi_normalized:
simag = features[:, 6:7]
sibry = features[:, 7:8]
psi_flat = psi_flat * (sibry - simag) + simag
nh, nw = self.cfg.grid_shape
if features.shape[0] == 1:
return psi_flat.reshape(nh, nw)
return psi_flat.reshape(-1, nh, nw)
# ── Save / Load ──────────────────────────────────────────────────
[docs]
def save_weights(self, path: str | Path = DEFAULT_WEIGHTS_PATH) -> None:
"""Save model to .npz (no pickle dependency)."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
payload: dict[str, NDArray] = {
"n_components": np.array([self.cfg.n_components]),
"grid_nh": np.array([self.cfg.grid_shape[0]]),
"grid_nw": np.array([self.cfg.grid_shape[1]]),
"n_input_features": np.array([self.cfg.n_input_features]),
"pca_mean": self.pca.mean_,
"pca_components": self.pca.components_,
"pca_evr": self.pca.explained_variance_ratio_,
"input_mean": self._input_mean,
"input_std": self._input_std,
"n_layers": np.array([len(self.mlp.weights)]),
}
for i, (w, b) in enumerate(zip(self.mlp.weights, self.mlp.biases)):
payload[f"w{i}"] = w
payload[f"b{i}"] = b
np.savez(path, **payload)
logger.info("Saved neural equilibrium weights to %s", path)
[docs]
def load_weights(self, path: str | Path = DEFAULT_WEIGHTS_PATH) -> None:
"""Load model from .npz."""
path = Path(path)
with np.load(path, allow_pickle=False) as data:
self.cfg.n_components = int(data["n_components"][0])
self.cfg.grid_shape = (int(data["grid_nh"][0]), int(data["grid_nw"][0]))
self.cfg.n_input_features = int(data["n_input_features"][0])
self.pca = MinimalPCA(self.cfg.n_components)
self.pca.mean_ = np.array(data["pca_mean"])
self.pca.components_ = np.array(data["pca_components"])
self.pca.explained_variance_ratio_ = np.array(data["pca_evr"])
self._input_mean = np.array(data["input_mean"])
self._input_std = np.array(data["input_std"])
n_layers = int(data["n_layers"][0])
weights = [np.array(data[f"w{i}"]) for i in range(n_layers)]
biases = [np.array(data[f"b{i}"]) for i in range(n_layers)]
# Reconstruct layer sizes from weight shapes
layer_sizes = [weights[0].shape[0]]
for w in weights:
layer_sizes.append(w.shape[1])
self.mlp = SimpleMLP(layer_sizes)
self.mlp.weights = weights
self.mlp.biases = biases
self._psi_normalized = bool(
"psi_normalized" in data and int(data["psi_normalized"][0]) == 1
)
self.is_trained = True
logger.info(
"Loaded neural equilibrium weights from %s (psi_normalized=%s)",
path,
self._psi_normalized,
)
# ── Convenience ──────────────────────────────────────────────────
[docs]
def benchmark(self, features: NDArray, n_runs: int = 100) -> dict[str, float]:
"""Time inference over n_runs and return stats."""
times = []
for _ in range(n_runs):
t0 = time.perf_counter()
self.predict(features)
times.append((time.perf_counter() - t0) * 1000)
return {
"mean_ms": float(np.mean(times)),
"std_ms": float(np.std(times)),
"median_ms": float(np.median(times)),
"p95_ms": float(np.percentile(times, 95)),
}
# ── Drop-in Kernel Replacement ──────────────────────────────────────
# Keep import local to module import path only. Relative import would fail when
# this file is executed directly as a script via `python neural_equilibrium.py`.
if __name__ != "__main__":
from .neural_equilibrium_training import train_on_sparc # noqa: F401
from .neural_equilibrium_kernel import NeuralEquilibriumKernel # noqa: F401
else:
from scpn_fusion.core.neural_equilibrium_training import run_training_cli
raise SystemExit(run_training_cli())