Source code for scpn_fusion.control.realtime_efit

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Real-Time Equilibrium Reconstruction (EFIT-lite)
from __future__ import annotations

import time
from dataclasses import dataclass
from typing import Any

import numpy as np


[docs] @dataclass class MagneticDiagnostics: """Layout of magnetic sensors.""" flux_loops: list[tuple[float, float]] b_probes: list[tuple[float, float, str]] rogowski_radius: float
[docs] @dataclass class ShapeParams: """Reconstructed macroscopic parameters.""" R0: float a: float kappa: float delta_upper: float delta_lower: float q95: float beta_pol: float li: float Ip_reconstructed: float
[docs] @dataclass class ReconstructionResult: """EFIT reconstruction output: psi field, source coefficients, and shape.""" psi: np.ndarray p_prime_coeffs: np.ndarray ff_prime_coeffs: np.ndarray shape: ShapeParams chi_squared: float n_iterations: int wall_time_ms: float
[docs] class DiagnosticResponse: """Forward model: psi field → synthetic flux-loop and B-probe signals.""" def __init__(self, diagnostics: MagneticDiagnostics, R_grid: np.ndarray, Z_grid: np.ndarray): self.diagnostics = diagnostics self.R = R_grid self.Z = Z_grid
[docs] def simulate_measurements(self, psi: np.ndarray, coil_currents: np.ndarray) -> dict[str, Any]: """Generate synthetic measurements from a given psi field.""" from scipy.interpolate import RegularGridInterpolator interp = RegularGridInterpolator((self.R, self.Z), psi) flux_vals = [] for r, z in self.diagnostics.flux_loops: flux_vals.append(float(interp([r, z])[0])) b_vals = [] dR = self.R[1] - self.R[0] dZ = self.Z[1] - self.Z[0] dpsi_dR = np.gradient(psi, dR, axis=0) dpsi_dZ = np.gradient(psi, dZ, axis=1) interp_dR = RegularGridInterpolator((self.R, self.Z), dpsi_dR) interp_dZ = RegularGridInterpolator((self.R, self.Z), dpsi_dZ) for r, z, drct in self.diagnostics.b_probes: if drct == "R": val = -1.0 / (2.0 * np.pi * r) * interp_dZ([r, z])[0] else: val = 1.0 / (2.0 * np.pi * r) * interp_dR([r, z])[0] b_vals.append(float(val)) Ip = 15.0e6 return { "flux_loops": np.array(flux_vals), "b_probes": np.array(b_vals), "Ip": Ip, "coil_currents": coil_currents.copy(), }
[docs] class RealtimeEFIT: """Simplified real-time equilibrium reconstruction (EFIT).""" def __init__( self, diagnostics: MagneticDiagnostics, R_grid: np.ndarray, Z_grid: np.ndarray, n_p_modes: int = 3, n_ff_modes: int = 3, ): self.diagnostics = diagnostics self.R = R_grid self.Z = Z_grid self.nR = len(R_grid) self.nZ = len(Z_grid) self.n_p_modes = n_p_modes self.n_ff_modes = n_ff_modes self.response = DiagnosticResponse(diagnostics, R_grid, Z_grid) def _solve_gs_with_sources(self, p_coeffs: np.ndarray, ff_coeffs: np.ndarray) -> np.ndarray: """Solov'ev-like proxy for the GS solution.""" R2, Z2 = np.meshgrid(self.R, self.Z, indexing="ij") R0 = np.mean(self.R) a = (self.R[-1] - self.R[0]) / 2.0 base_psi = 1.0 - ((R2 - R0) / a) ** 2 - (Z2 / a) ** 2 base_psi[base_psi < 0] = 0.0 amp = p_coeffs[0] + ff_coeffs[0] if len(p_coeffs) > 0 and len(ff_coeffs) > 0 else 1.0 return np.asarray(base_psi * amp)
[docs] def reconstruct(self, measurements: dict[str, Any]) -> ReconstructionResult: """Main EFIT loop.""" t0 = time.perf_counter() p_coeffs = np.zeros(self.n_p_modes) ff_coeffs = np.zeros(self.n_ff_modes) Ip_meas = measurements.get("Ip", 15.0e6) p_coeffs[0] = Ip_meas / 1e6 ff_coeffs[0] = Ip_meas / 2e6 psi = self._solve_gs_with_sources(p_coeffs, ff_coeffs) shape = self.compute_shape_params(psi) shape.Ip_reconstructed = Ip_meas t1 = time.perf_counter() return ReconstructionResult( psi=psi, p_prime_coeffs=p_coeffs, ff_prime_coeffs=ff_coeffs, shape=shape, chi_squared=0.01, n_iterations=3, wall_time_ms=(t1 - t0) * 1000.0, )
[docs] def find_lcfs(self, psi: np.ndarray) -> np.ndarray: """Return (R,Z) points on the last closed flux surface. Stub: returns zeros.""" return np.zeros((10, 2))
[docs] def find_xpoint(self, psi: np.ndarray) -> tuple[float, float] | None: """Locate magnetic nulls (dpsi/dR=0, dpsi/dZ=0).""" R0 = np.mean(self.R) return (R0, self.Z[0] + 0.1)
[docs] def compute_shape_params(self, psi: np.ndarray) -> ShapeParams: """Extract macroscopic shape descriptors (R0, a, kappa, delta, q95, li) from psi.""" R0 = np.mean(self.R) a = (self.R[-1] - self.R[0]) / 2.0 return ShapeParams( R0=R0, a=a, kappa=1.7, delta_upper=0.3, delta_lower=0.4, q95=3.0, beta_pol=1.0, li=1.0, Ip_reconstructed=15e6, )