Source code for scpn_fusion.control.realtime_efit

# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Real-Time Equilibrium Reconstruction (EFIT-lite)
"""Realtime EFIT-like magnetic reconstruction and synthetic diagnostic response."""

from __future__ import annotations

import time
from dataclasses import dataclass
from typing import Any

import numpy as np
from numpy.typing import NDArray

FloatArray = NDArray[np.float64]


[docs] @dataclass class MagneticDiagnostics: """Layout of magnetic sensors.""" flux_loops: list[tuple[float, float]] b_probes: list[tuple[float, float, str]] rogowski_radius: float
[docs] @dataclass class ShapeParams: """Reconstructed macroscopic parameters.""" R0: float a: float kappa: float delta_upper: float delta_lower: float q95: float beta_pol: float li: float Ip_reconstructed: float
[docs] @dataclass class ReconstructionResult: """EFIT reconstruction output: psi field, source coefficients, and shape.""" psi: FloatArray p_prime_coeffs: FloatArray ff_prime_coeffs: FloatArray shape: ShapeParams chi_squared: float n_iterations: int wall_time_ms: float
[docs] class DiagnosticResponse: """Forward model: psi field → synthetic flux-loop and B-probe signals."""
[docs] def __init__( self, diagnostics: MagneticDiagnostics, R_grid: FloatArray, Z_grid: FloatArray ) -> None: """Store the sensor layout and the R/Z reconstruction grids.""" self.diagnostics = diagnostics self.R = R_grid self.Z = Z_grid
[docs] def simulate_measurements(self, psi: FloatArray, coil_currents: FloatArray) -> dict[str, Any]: """Generate synthetic measurements from a given psi field.""" from scipy.interpolate import RegularGridInterpolator interp = RegularGridInterpolator( (self.R, self.Z), psi, bounds_error=False, fill_value=np.nan ) flux_vals = [] for r, z in self.diagnostics.flux_loops: flux_vals.append(float(interp([r, z])[0])) b_vals = [] dR = self.R[1] - self.R[0] dZ = self.Z[1] - self.Z[0] dpsi_dR = np.gradient(psi, dR, axis=0) dpsi_dZ = np.gradient(psi, dZ, axis=1) interp_dR = RegularGridInterpolator( (self.R, self.Z), dpsi_dR, bounds_error=False, fill_value=np.nan ) interp_dZ = RegularGridInterpolator( (self.R, self.Z), dpsi_dZ, bounds_error=False, fill_value=np.nan ) for r, z, drct in self.diagnostics.b_probes: if drct == "R": val = -1.0 / (2.0 * np.pi * r) * interp_dZ([r, z])[0] else: val = 1.0 / (2.0 * np.pi * r) * interp_dR([r, z])[0] b_vals.append(float(val)) Ip = 15.0e6 return { "flux_loops": np.array(flux_vals), "b_probes": np.array(b_vals), "Ip": Ip, "coil_currents": coil_currents.copy(), }
[docs] class RealtimeEFIT: """Real-time equilibrium reconstruction with an EFIT-compatible basis."""
[docs] def __init__( self, diagnostics: MagneticDiagnostics, R_grid: FloatArray, Z_grid: FloatArray, n_p_modes: int = 3, n_ff_modes: int = 3, ) -> None: """Validate the grids and diagnostics and build the forward response model.""" self.diagnostics = diagnostics self.R = self._validate_grid("R_grid", R_grid) self.Z = self._validate_grid("Z_grid", Z_grid) self.nR = len(R_grid) self.nZ = len(Z_grid) if n_p_modes < 1 or n_ff_modes < 1: raise ValueError("source basis must contain at least one p' and FF' mode.") self.n_p_modes = n_p_modes self.n_ff_modes = n_ff_modes self._validate_diagnostics() self.response = DiagnosticResponse(diagnostics, self.R, self.Z)
@staticmethod def _validate_grid(name: str, grid: FloatArray) -> FloatArray: values = np.asarray(grid, dtype=float) if values.ndim != 1 or len(values) < 3: raise ValueError(f"{name} must be a one-dimensional grid with at least 3 points.") if not np.all(np.isfinite(values)): raise ValueError(f"{name} must contain only finite values.") if np.any(np.diff(values) <= 0.0): raise ValueError(f"{name} must be strictly increasing.") return values def _validate_diagnostics(self) -> None: if self.diagnostics.rogowski_radius <= 0.0: raise ValueError("rogowski_radius must be positive.") for r, z in self.diagnostics.flux_loops: self._validate_finite_point(r, z, "flux loop") for r, z, direction in self.diagnostics.b_probes: self._validate_finite_point(r, z, "B probe") if direction not in {"R", "Z"}: raise ValueError("B probe direction must be 'R' or 'Z'.") @staticmethod def _validate_finite_point(r: float, z: float, label: str) -> None: if not (np.isfinite(r) and np.isfinite(z)): raise ValueError(f"{label} coordinates must be finite.") def _validate_psi(self, psi: FloatArray) -> FloatArray: field = np.asarray(psi, dtype=float) if field.shape != (self.nR, self.nZ): raise ValueError(f"psi must have shape {(self.nR, self.nZ)}.") if not np.all(np.isfinite(field)): raise ValueError("psi must contain only finite values.") return field def _validate_measurements(self, measurements: dict[str, Any]) -> dict[str, FloatArray | float]: flux = np.asarray(measurements.get("flux_loops", []), dtype=float) probes = np.asarray(measurements.get("b_probes", []), dtype=float) coils = np.asarray(measurements.get("coil_currents", []), dtype=float) ip = float(measurements.get("Ip", 15.0e6)) if flux.shape != (len(self.diagnostics.flux_loops),): raise ValueError("flux_loops measurement length does not match diagnostics layout.") if probes.shape != (len(self.diagnostics.b_probes),): raise ValueError("b_probes measurement length does not match diagnostics layout.") if coils.ndim != 1: raise ValueError("coil_currents must be a one-dimensional vector.") if not np.all(np.isfinite(flux)) or not np.all(np.isfinite(probes)): raise ValueError("magnetic measurements must be finite.") if not np.all(np.isfinite(coils)) or not np.isfinite(ip) or ip <= 0.0: raise ValueError("Ip and coil_currents must be finite, with Ip positive.") return {"flux_loops": flux, "b_probes": probes, "coil_currents": coils, "Ip": ip} def _solve_gs_with_sources(self, p_coeffs: FloatArray, ff_coeffs: FloatArray) -> FloatArray: """Solov'ev-like proxy for the GS solution.""" R2, Z2 = np.meshgrid(self.R, self.Z, indexing="ij") R0 = np.mean(self.R) a = (self.R[-1] - self.R[0]) / 2.0 base_psi = 1.0 - ((R2 - R0) / a) ** 2 - (Z2 / a) ** 2 base_psi[base_psi < 0] = 0.0 amp = p_coeffs[0] + ff_coeffs[0] if len(p_coeffs) > 0 and len(ff_coeffs) > 0 else 1.0 return np.asarray(base_psi * amp, dtype=np.float64)
[docs] def reconstruct(self, measurements: dict[str, Any]) -> ReconstructionResult: """Run the EFIT reconstruction loop and return the equilibrium result.""" t0 = time.perf_counter() validated = self._validate_measurements(measurements) p_coeffs = np.zeros(self.n_p_modes) ff_coeffs = np.zeros(self.n_ff_modes) Ip_meas = float(validated["Ip"]) p_coeffs[0] = Ip_meas / 1e6 ff_coeffs[0] = Ip_meas / 2e6 psi = self._solve_gs_with_sources(p_coeffs, ff_coeffs) shape = self.compute_shape_params(psi) shape.Ip_reconstructed = Ip_meas synthetic = self.response.simulate_measurements(psi, np.asarray(validated["coil_currents"])) flux_residual = synthetic["flux_loops"] - np.asarray(validated["flux_loops"]) probe_residual = synthetic["b_probes"] - np.asarray(validated["b_probes"]) residual = np.concatenate((flux_residual, probe_residual)) if np.all(np.isfinite(residual)): scale = max( float( np.linalg.norm(np.concatenate((synthetic["flux_loops"], synthetic["b_probes"]))) ), 1.0, ) chi_squared = float(np.dot(residual, residual) / scale**2) else: chi_squared = float("inf") t1 = time.perf_counter() return ReconstructionResult( psi=psi, p_prime_coeffs=p_coeffs, ff_prime_coeffs=ff_coeffs, shape=shape, chi_squared=chi_squared, n_iterations=1, wall_time_ms=(t1 - t0) * 1000.0, )
[docs] def find_lcfs(self, psi: FloatArray) -> FloatArray: """Return ordered (R,Z) points on the last closed flux surface.""" field = self._validate_psi(psi) psi_min = float(np.min(field)) psi_axis = float(np.max(field)) if psi_axis <= psi_min: raise ValueError("psi must contain a non-degenerate closed-flux region.") boundary_values = np.concatenate((field[0, :], field[-1, :], field[:, 0], field[:, -1])) edge_level = float(np.max(boundary_values)) level = edge_level + 0.005 * (psi_axis - edge_level) inside = field >= level boundary = np.zeros_like(inside, dtype=bool) boundary[1:-1, 1:-1] = inside[1:-1, 1:-1] & ( ~inside[:-2, 1:-1] | ~inside[2:, 1:-1] | ~inside[1:-1, :-2] | ~inside[1:-1, 2:] ) indices = np.argwhere(boundary) if len(indices) < 8: raise ValueError("psi does not contain enough LCFS boundary support.") points = np.column_stack((self.R[indices[:, 0]], self.Z[indices[:, 1]])) centre = np.mean(points, axis=0) angles = np.arctan2(points[:, 1] - centre[1], points[:, 0] - centre[0]) return points[np.argsort(angles)]
[docs] def find_xpoint(self, psi: FloatArray) -> tuple[float, float] | None: """Locate magnetic nulls (dpsi/dR=0, dpsi/dZ=0).""" field = self._validate_psi(psi) dR = float(np.mean(np.diff(self.R))) dZ = float(np.mean(np.diff(self.Z))) dpsi_dR = np.gradient(field, dR, axis=0) dpsi_dZ = np.gradient(field, dZ, axis=1) grad_norm = np.hypot(dpsi_dR, dpsi_dZ) if not np.any(np.isfinite(grad_norm)): return None i, j = np.unravel_index(int(np.nanargmin(grad_norm)), grad_norm.shape) return (float(self.R[i]), float(self.Z[j]))
[docs] def compute_shape_params(self, psi: FloatArray) -> ShapeParams: """Extract macroscopic shape descriptors (R0, a, kappa, delta, q95, li) from psi.""" field = self._validate_psi(psi) lcfs = self.find_lcfs(field) axis_i, axis_j = np.unravel_index(int(np.nanargmax(field)), field.shape) r_min, r_max = self._profile_extent(self.R, field[:, axis_j]) z_min, z_max = self._profile_extent(self.Z, field[axis_i, :]) R0 = 0.5 * (r_min + r_max) a = 0.5 * (r_max - r_min) if a <= 0.0: raise ValueError("LCFS must span a positive minor radius.") z_values = lcfs[:, 1] upper = lcfs[np.argmax(z_values)] lower = lcfs[np.argmin(z_values)] kappa = (z_max - z_min) / (2.0 * a) delta_upper = (R0 - float(upper[0])) / a delta_lower = (R0 - float(lower[0])) / a positive = field[field > 0.0] beta_pol = float(np.mean(positive) / max(np.max(field), 1e-12)) if positive.size else 0.0 dR = float(np.mean(np.diff(self.R))) dZ = float(np.mean(np.diff(self.Z))) grad_r, grad_z = np.gradient(field, dR, dZ, edge_order=1) li = float(np.mean(grad_r**2 + grad_z**2) / max(np.max(field) ** 2, 1e-12)) q95 = float(max(1.0, 5.0 * R0 / max(a, 1e-12) / max(1.0 + kappa**2, 1e-12))) return ShapeParams( R0=R0, a=a, kappa=float(kappa), delta_upper=float(delta_upper), delta_lower=float(delta_lower), q95=q95, beta_pol=beta_pol, li=li, Ip_reconstructed=15e6, )
@staticmethod def _profile_extent(coords: FloatArray, profile: FloatArray) -> tuple[float, float]: peak = float(np.max(profile)) if peak <= 0.0: raise ValueError("psi profile must contain positive closed-flux support.") active = np.flatnonzero(profile > peak * 1.0e-9) if active.size == 0: raise ValueError("psi profile must contain positive closed-flux support.") low = int(active[0]) high = int(active[-1]) lower = float(coords[max(low - 1, 0)] if low > 0 else coords[low]) upper = float( coords[min(high + 1, len(coords) - 1)] if high < len(coords) - 1 else coords[high] ) return lower, upper