Source code for scpn_fusion.hpc.hpc_bridge

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — HPC Bridge
from __future__ import annotations

import ctypes
import logging
import math
import numpy as np
import os
import platform
import subprocess
from pathlib import Path
from typing import Optional

from numpy.typing import NDArray

logger = logging.getLogger(__name__)
_CPP_BUILD_TIMEOUT_SECONDS = 300.0


try:
    from scpn_fusion.core.neural_equilibrium import NeuralEquilibriumKernel
except ImportError:
    NeuralEquilibriumKernel = None


def _as_contiguous_f64(array: NDArray[np.floating]) -> NDArray[np.float64]:
    """Return ``array`` as C-contiguous ``float64`` with minimal copying."""
    if isinstance(array, np.ndarray) and array.dtype == np.float64 and array.flags.c_contiguous:
        return array
    return np.ascontiguousarray(array, dtype=np.float64)


def _require_c_contiguous_f64(
    array: NDArray[np.floating],
    expected_shape: tuple[int, int],
    name: str,
) -> NDArray[np.float64]:
    """Validate that an output buffer can be written into without copying."""
    if not isinstance(array, np.ndarray):
        raise ValueError(f"{name} must be a numpy.ndarray")
    if array.dtype != np.float64:
        raise ValueError(f"{name} must have dtype float64")
    if not array.flags.c_contiguous:
        raise ValueError(f"{name} must be C-contiguous")
    if tuple(array.shape) != tuple(expected_shape):
        raise ValueError(
            f"{name} shape mismatch: expected {expected_shape}, received {tuple(array.shape)}"
        )
    return array


def _sanitize_convergence_params(
    max_iterations: int,
    tolerance: float,
    omega: float,
) -> tuple[int, float, float]:
    """Validate convergence parameters for native calls."""
    max_iters = int(max_iterations)
    if max_iters < 1:
        raise ValueError("max_iterations must be >= 1.")

    tol_safe = float(tolerance)
    if not math.isfinite(tol_safe) or tol_safe < 0.0:
        raise ValueError("tolerance must be finite and >= 0.")

    omega_safe = float(omega)
    if not math.isfinite(omega_safe) or omega_safe <= 0.0 or omega_safe >= 2.0:
        raise ValueError("omega must be finite and in (0, 2).")

    return max_iters, tol_safe, omega_safe


[docs] class HPCBridge: """Interface between Python and the compiled C++ Grad-Shafranov solver. Loads the shared library (``libscpn_solver.so`` / ``scpn_solver.dll``) at construction time. If the library is not found the bridge gracefully degrades — :meth:`is_available` returns ``False`` and the caller falls back to Python. Parameters ---------- lib_path : str, optional Explicit path to the shared library. When *None* (default) the bridge searches trusted package-local locations only, unless ``SCPN_SOLVER_LIB`` is set explicitly. """ def __init__(self, lib_path: Optional[str] = None) -> None: self.lib = None self.solver_ptr = None self.loaded: bool = False self.load_error: Optional[str] = None self.close_error: Optional[str] = None self.close_error_count: int = 0 self._destroy_symbol: Optional[str] = None self._has_converged_api: bool = False self._has_boundary_api: bool = False lib_name = "scpn_solver.dll" if platform.system() == "Windows" else "libscpn_solver.so" env_path = os.environ.get("SCPN_SOLVER_LIB") if lib_path is None and env_path: lib_path = env_path if lib_path is None: here = Path(__file__).resolve().parent candidates = [ here / lib_name, here / "bin" / lib_name, ] for c in candidates: if c.exists(): lib_path = str(c) break if lib_path is None: lib_path = str(here / lib_name) self.lib_path = str(lib_path) try: self.lib = ctypes.CDLL(self.lib_path) self._setup_signatures() logger.info("Loaded C++ accelerator: %s", self.lib_path) self.loaded = True except OSError as exc: self.load_error = str(exc) logger.debug( "C++ accelerator unavailable at %s; falling back to Python solver: %s", self.lib_path, self.load_error, )
[docs] def is_available(self) -> bool: """Return *True* if the compiled solver library was loaded.""" return self.loaded
[docs] def close(self) -> None: """Release the C++ solver instance, if one was created.""" if self.solver_ptr is not None and self.loaded: try: if self.lib is not None and self._destroy_symbol is not None: getattr(self.lib, self._destroy_symbol)(self.solver_ptr) except Exception as exc: self.close_error = str(exc) self.close_error_count = int(getattr(self, "close_error_count", 0)) + 1 logger.warning( "Failed to release C++ solver instance cleanly: %s", self.close_error ) self.solver_ptr = None
def __del__(self) -> None: self.close() def __enter__(self) -> "HPCBridge": return self def __exit__(self, *exc) -> None: self.close() def _setup_signatures(self): # void* create_solver(int nr, int nz, double rmin, double rmax, double zmin, double zmax) self.lib.create_solver.argtypes = [ ctypes.c_int, ctypes.c_int, ctypes.c_double, ctypes.c_double, ctypes.c_double, ctypes.c_double, ] self.lib.create_solver.restype = ctypes.c_void_p # void run_step(void* solver, double* j, double* psi, int size, int iter) self.lib.run_step.argtypes = [ ctypes.c_void_p, np.ctypeslib.ndpointer(dtype=np.float64, flags="C_CONTIGUOUS"), np.ctypeslib.ndpointer(dtype=np.float64, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, ] # int run_step_converged(void* solver, const double* j, double* psi, # int size, int max_iter, double omega, # double tol, double* final_delta) if hasattr(self.lib, "run_step_converged"): self.lib.run_step_converged.argtypes = [ ctypes.c_void_p, np.ctypeslib.ndpointer(dtype=np.float64, flags="C_CONTIGUOUS"), np.ctypeslib.ndpointer(dtype=np.float64, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, ctypes.c_double, ctypes.c_double, ctypes.POINTER(ctypes.c_double), ] self.lib.run_step_converged.restype = ctypes.c_int self._has_converged_api = True else: self._has_converged_api = False # void set_boundary_dirichlet(void* solver, double boundary_value) if hasattr(self.lib, "set_boundary_dirichlet"): self.lib.set_boundary_dirichlet.argtypes = [ctypes.c_void_p, ctypes.c_double] self.lib.set_boundary_dirichlet.restype = None self._has_boundary_api = True else: self._has_boundary_api = False # void destroy_solver(void* solver) or void delete_solver(void* solver) if hasattr(self.lib, "destroy_solver"): self.lib.destroy_solver.argtypes = [ctypes.c_void_p] self.lib.destroy_solver.restype = None self._destroy_symbol = "destroy_solver" elif hasattr(self.lib, "delete_solver"): self.lib.delete_solver.argtypes = [ctypes.c_void_p] self.lib.delete_solver.restype = None self._destroy_symbol = "delete_solver" else: self._destroy_symbol = None
[docs] def initialize( self, nr: int, nz: int, r_range: tuple[float, float], z_range: tuple[float, float], boundary_value: float = 0.0, ) -> None: """Create the C++ solver instance for the given grid dimensions.""" if not self.loaded: return if nr < 2 or nz < 2: raise ValueError("nr and nz must be >= 2.") if r_range[0] >= r_range[1] or z_range[0] >= z_range[1]: raise ValueError("r_range/z_range must have min < max.") self.nr = nr self.nz = nz self.solver_ptr = self.lib.create_solver( nr, nz, r_range[0], r_range[1], z_range[0], z_range[1] ) self.set_boundary_dirichlet(boundary_value)
[docs] def set_boundary_dirichlet(self, boundary_value: float = 0.0) -> None: """Set a fixed Dirichlet boundary value for psi edges, if supported.""" if ( not self.loaded or self.solver_ptr is None or self.lib is None or not self._has_boundary_api ): return self.lib.set_boundary_dirichlet(self.solver_ptr, float(boundary_value))
[docs] def solve( self, j_phi: NDArray[np.float64], iterations: int = 100, ) -> Optional[NDArray[np.float64]]: """Run the C++ solver for *iterations* sweeps. Returns *None* if the library is not loaded (caller should fall back to a Python solver). """ prepared = self._prepare_inputs(j_phi) if prepared is None: return None _, expected_shape = prepared psi_out = np.zeros(expected_shape, dtype=np.float64) solved = self.solve_into(j_phi, psi_out, iterations=iterations) if solved is None: return None return solved
[docs] def solve_into( self, j_phi: NDArray[np.float64], psi_out: NDArray[np.float64], iterations: int = 100, ) -> Optional[NDArray[np.float64]]: """Run the C++ solver and write results into ``psi_out`` in-place.""" prepared = self._prepare_inputs(j_phi) if prepared is None: return None j_input, expected_shape = prepared psi_target = _require_c_contiguous_f64(psi_out, expected_shape, "psi_out") self.lib.run_step( self.solver_ptr, j_input, psi_target, int(j_input.size), int(iterations), ) return psi_target
[docs] def solve_neural( self, config_path: Optional[str | Path] = None ) -> Optional[NDArray[np.float64]]: """ Run the O(1) Neural Equilibrium Surrogate. Requires NeuralEquilibriumKernel (JAX/NPZ weights). """ if NeuralEquilibriumKernel is None: logger.warning("NeuralEquilibriumKernel not available (ImportError).") return None try: # Note: NeuralEquilibriumKernel needs a config for grid sizing # default to iter_config.json in root if not provided if config_path is None: config_path = Path(__file__).resolve().parents[3] / "iter_config.json" kernel = NeuralEquilibriumKernel(config_path) res = kernel.solve_equilibrium() return res.get("Psi") except Exception as exc: logger.error("Neural surrogate inference failed: %s", exc) return None
[docs] def solve_until_converged( self, j_phi: NDArray[np.float64], max_iterations: int = 1000, tolerance: float = 1e-6, omega: float = 1.8, ) -> Optional[tuple[NDArray[np.float64], int, float]]: """Run solver until convergence, if native API is available. Returns ``(psi, iterations_used, final_delta)``. If the library is unavailable or uninitialized, returns ``None``. """ prepared = self._prepare_inputs(j_phi) if prepared is None: return None _, expected_shape = prepared psi_out = np.zeros(expected_shape, dtype=np.float64) converged = self.solve_until_converged_into( j_phi, psi_out, max_iterations=max_iterations, tolerance=tolerance, omega=omega, ) if converged is None: return None iterations_used, final_delta = converged return psi_out, iterations_used, final_delta
[docs] def solve_until_converged_into( self, j_phi: NDArray[np.float64], psi_out: NDArray[np.float64], max_iterations: int = 1000, tolerance: float = 1e-6, omega: float = 1.8, ) -> Optional[tuple[int, float]]: """Run convergence API and write results into ``psi_out`` in-place.""" prepared = self._prepare_inputs(j_phi) if prepared is None: return None j_input, expected_shape = prepared psi_target = _require_c_contiguous_f64(psi_out, expected_shape, "psi_out") max_iters, tol_safe, omega_safe = _sanitize_convergence_params( max_iterations, tolerance, omega ) if not self._has_converged_api: self.lib.run_step( self.solver_ptr, j_input, psi_target, int(j_input.size), int(max_iters), ) return int(max_iters), float("nan") final_delta = ctypes.c_double(0.0) iterations_used = int( self.lib.run_step_converged( self.solver_ptr, j_input, psi_target, int(j_input.size), int(max_iters), float(omega_safe), float(tol_safe), ctypes.byref(final_delta), ) ) return iterations_used, float(final_delta.value)
def _prepare_inputs( self, j_phi: NDArray[np.float64] ) -> Optional[tuple[NDArray[np.float64], tuple[int, int]]]: if not self.loaded or self.solver_ptr is None: return None j_input = _as_contiguous_f64(j_phi) if j_input.ndim != 2: raise ValueError(f"j_phi must be a 2D array, received ndim={j_input.ndim}") if j_input.size == 0: raise ValueError("j_phi must be non-empty") if not np.all(np.isfinite(j_input)): raise ValueError("j_phi must contain only finite values") expected_shape = ( getattr(self, "nz", j_input.shape[0]), getattr(self, "nr", j_input.shape[-1]), ) if tuple(j_input.shape) != tuple(expected_shape): raise ValueError( f"j_phi shape mismatch: expected {expected_shape}, received {tuple(j_input.shape)}" ) return j_input, expected_shape
[docs] def compile_cpp() -> Optional[str]: """Compile the C++ solver from source. Looks for ``solver.cpp`` in the same directory as this module and invokes ``g++`` to produce a shared library. Returns ------- str or None Path to the compiled library, or *None* on failure. """ if os.environ.get("SCPN_ALLOW_NATIVE_BUILD") != "1": logger.warning("Native build disabled. Set SCPN_ALLOW_NATIVE_BUILD=1 to enable.") return None logger.info("Compiling C++ solver kernel…") script_dir = Path(__file__).resolve().parent src = script_dir / "solver.cpp" out_dir = script_dir / "bin" out_dir.mkdir(exist_ok=True) if platform.system() == "Windows": out = out_dir / "scpn_solver.dll" cmd = ["g++", "-shared", "-o", str(out), str(src), "-O3", "-mavx2"] else: out = out_dir / "libscpn_solver.so" cmd = [ "g++", "-shared", "-fPIC", "-o", str(out), str(src), "-O3", "-march=native", ] logger.info("Executing: %s", " ".join(cmd)) try: subprocess.run(cmd, check=True, timeout=_CPP_BUILD_TIMEOUT_SECONDS) except subprocess.TimeoutExpired as exc: logger.error( "Compilation timed out after %.1fs: %s", _CPP_BUILD_TIMEOUT_SECONDS, exc, ) return None except (subprocess.CalledProcessError, FileNotFoundError, OSError) as exc: logger.error("Compilation failed: %s", exc) return None logger.info("Compilation succeeded: %s", out) return str(out)
if __name__ == "__main__": # Test sequence lib_file = compile_cpp() if lib_file: bridge = HPCBridge(lib_file) # Test Grid N = 100 bridge.initialize(N, N, (2.0, 10.0), (-5.0, 5.0)) # Dummy Current (Gaussian) J = np.random.rand(N, N) # Run Psi = bridge.solve(J, iterations=500) print(f"Max Flux: {np.max(Psi)}")