Source code for scpn_fusion.control.analytic_solver

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Analytic Solver
from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Any, Callable, Dict, Optional

import numpy as np
from scpn_fusion.fallback_telemetry import record_fallback_event

logger = logging.getLogger(__name__)

try:
    from scpn_fusion.core._rust_compat import FusionKernel
except ImportError:
    try:
        from scpn_fusion.core.fusion_kernel import FusionKernel
    except ImportError as exc:  # pragma: no cover - import-guard path
        raise ImportError(
            "Unable to import FusionKernel. Run with PYTHONPATH=src "
            "or use `python -m scpn_fusion.control.analytic_solver`."
        ) from exc


[docs] class AnalyticEquilibriumSolver: """ Analytic vertical-field target and least-norm coil-current solve. """ def __init__( self, config_path: str, *, kernel_factory: Callable[[str], Any] = FusionKernel, verbose: bool = True, ) -> None: self.kernel = kernel_factory(str(config_path)) self.config_path = str(config_path) self.verbose = bool(verbose) def _log(self, message: str) -> None: if self.verbose: logger.info(message)
[docs] def calculate_required_Bv( self, R_geo: float, a_min: float, Ip_MA: float, *, beta_p: float = 0.5, li: float = 0.8, ) -> float: """ Shafranov radial-force balance vertical field estimate. """ R_geo = float(R_geo) a_min = float(a_min) Ip_MA = float(Ip_MA) beta_p = float(beta_p) li = float(li) if R_geo <= 0.0 or a_min <= 0.0 or Ip_MA <= 0.0: raise ValueError("R_geo, a_min and Ip_MA must be > 0.") mu0 = 4.0 * np.pi * 1e-7 Ip = Ip_MA * 1e6 term_log = np.log(8.0 * R_geo / a_min) term_physics = beta_p + (li / 2.0) - 1.5 Bv = -((mu0 * Ip) / (4.0 * np.pi * R_geo)) * (term_log + term_physics) self._log("--- SHAFRANOV EQUILIBRIUM CHECK ---") self._log(f"Target Radius: {R_geo:.3f} m") self._log(f"Plasma Current: {Ip_MA:.3f} MA") self._log(f"Required Vertical Field (Bv): {Bv:.6f} Tesla") return float(Bv)
[docs] def compute_coil_efficiencies( self, target_R: float, *, target_Z: float = 0.0, ) -> np.ndarray: """ Compute dBz/dI per coil at target location using kernel vacuum-field map. """ coils = self.kernel.cfg.get("coils", []) n_coils = len(coils) if n_coils == 0: raise ValueError("Kernel config has no coils.") target_R = float(target_R) target_Z = float(target_Z) if target_R <= 0.0: raise ValueError("target_R must be > 0.") original_currents = [float(c.get("current", 0.0)) for c in coils] eff = np.zeros(n_coils, dtype=np.float64) idx_r = int(np.argmin(np.abs(np.asarray(self.kernel.R, dtype=np.float64) - target_R))) idx_z = int(np.argmin(np.abs(np.asarray(self.kernel.Z, dtype=np.float64) - target_Z))) idx_r = int(np.clip(idx_r, 1, len(self.kernel.R) - 2)) dR = float(getattr(self.kernel, "dR", float(self.kernel.R[1] - self.kernel.R[0]))) if dR <= 0.0: raise ValueError("Kernel grid spacing dR must be > 0.") self._log("\nCalculating Coil Influence Matrix (Green's Functions)...") try: for i in range(n_coils): for c in coils: c["current"] = 0.0 coils[i]["current"] = 1.0 psi_vac = np.asarray(self.kernel.calculate_vacuum_field(), dtype=np.float64) dpsi = (psi_vac[idx_z, idx_r + 1] - psi_vac[idx_z, idx_r - 1]) / (2.0 * dR) bz_unit = float((1.0 / target_R) * dpsi) eff[i] = bz_unit name = str(coils[i].get("name", f"coil_{i}")) self._log(f" Coil {name} Efficiency: {bz_unit:.6f} T/MA") finally: for c, current in zip(coils, original_currents): c["current"] = float(current) return eff
[docs] def solve_coil_currents( self, target_Bv: float, target_R: float, *, target_Z: float = 0.0, ridge_lambda: float = 0.0, ) -> np.ndarray: """ Solve least-norm coil currents for desired vertical field target. """ eff = self.compute_coil_efficiencies(target_R, target_Z=target_Z) target_Bv = float(target_Bv) ridge_lambda = max(float(ridge_lambda), 0.0) g = eff.reshape(1, -1) if ridge_lambda > 0.0: gg = float(np.dot(eff, eff)) denom = max(gg + ridge_lambda, 1e-12) currents = (eff * target_Bv) / denom else: currents = np.linalg.pinv(g).dot(np.array([target_Bv], dtype=np.float64)).reshape(-1) self._log("\n--- ANALYTIC SOLUTION (Least Norm) ---") for i, val in enumerate(currents): name = str(self.kernel.cfg["coils"][i].get("name", f"coil_{i}")) self._log(f" {name}: {float(val):.6f} MA") return np.asarray(currents, dtype=np.float64)
[docs] def apply_currents(self, currents: np.ndarray) -> None: arr = np.asarray(currents, dtype=np.float64).reshape(-1) coils = self.kernel.cfg.get("coils", []) if arr.size != len(coils): raise ValueError("Current vector length mismatch with kernel coils.") for i, val in enumerate(arr): coils[i]["current"] = float(val)
[docs] def apply_and_save( self, currents: np.ndarray, output_path: Optional[str] = None, ) -> str: self.apply_currents(currents) if output_path is None: repo_root = Path(__file__).resolve().parents[3] out_path = repo_root / "validation" / "iter_analytic_config.json" else: out_path = Path(output_path) out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", encoding="utf-8") as f: json.dump(self.kernel.cfg, f, indent=4) self._log(f"Saved analytic configuration: {out_path}") return str(out_path)
def _resolve_default_config_path( repo_root: Path, *, allow_validation_fallback: bool = True, ) -> tuple[str, str, bool]: """Resolve the default analytic solver config with explicit fallback policy.""" preferred = repo_root / "calibration" / "iter_genetic_temp.json" fallback = repo_root / "validation" / "iter_validated_config.json" if preferred.exists(): return str(preferred), "preferred_default", False if fallback.exists(): if not allow_validation_fallback: raise FileNotFoundError( "Preferred default config is missing and validation fallback is disabled: " f"{preferred}" ) record_fallback_event( "analytic_solver", "default_config_validation_fallback", context={ "preferred_config": str(preferred), "fallback_config": str(fallback), }, ) logger.warning( "Preferred analytic config missing; using validation fallback: %s", fallback, ) return str(fallback), "validation_fallback_default", True raise FileNotFoundError( f"No default analytic config found. Checked:\n- {preferred}\n- {fallback}" )
[docs] def run_analytic_solver( config_path: Optional[str] = None, *, target_r: float = 6.2, target_z: float = 0.0, a_minor: float = 2.0, ip_target_ma: float = 15.0, beta_p: float = 0.5, li: float = 0.8, ridge_lambda: float = 0.0, save_config: bool = True, output_config_path: Optional[str] = None, allow_validation_fallback: bool = True, verbose: bool = True, kernel_factory: Callable[[str], Any] = FusionKernel, ) -> Dict[str, Any]: """ Run analytic equilibrium solve and return deterministic summary. """ repo_root = Path(__file__).resolve().parents[3] config_source = "explicit" fallback_used = False if config_path is None: config_path, config_source, fallback_used = _resolve_default_config_path( repo_root, allow_validation_fallback=allow_validation_fallback, ) solver = AnalyticEquilibriumSolver( str(config_path), kernel_factory=kernel_factory, verbose=verbose, ) required_bv = solver.calculate_required_Bv( target_r, a_minor, ip_target_ma, beta_p=beta_p, li=li, ) currents = solver.solve_coil_currents( required_bv, target_r, target_Z=target_z, ridge_lambda=ridge_lambda, ) written_path: Optional[str] = None if save_config: written_path = solver.apply_and_save(currents, output_path=output_config_path) else: solver.apply_currents(currents) names = [str(c.get("name", f"coil_{i}")) for i, c in enumerate(solver.kernel.cfg["coils"])] summary_currents = {name: float(currents[i]) for i, name in enumerate(names)} return { "config_path": str(config_path), "config_source": str(config_source), "fallback_used": bool(fallback_used), "output_config_path": written_path, "target_r_m": float(target_r), "target_z_m": float(target_z), "a_minor_m": float(a_minor), "ip_target_ma": float(ip_target_ma), "required_bv_t": float(required_bv), "coil_currents_ma": summary_currents, "coil_current_l2_norm": float(np.linalg.norm(currents)), "max_abs_coil_current_ma": float(np.max(np.abs(currents))) if currents.size else 0.0, }
if __name__ == "__main__": run_analytic_solver()