Source code for scpn_fusion.hpc.hpc_bridge

# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — HPC Bridge
"""Trust-checked Python bridge to optional native HPC Grad-Shafranov solver."""

from __future__ import annotations

import ctypes
import hashlib
import hmac
import json
import logging
import math
import numpy as np
import os
import platform
import shutil
import stat
import subprocess
from pathlib import Path
from types import TracebackType
from typing import Any, Optional

from numpy.typing import NDArray

logger = logging.getLogger(__name__)
_CPP_BUILD_TIMEOUT_SECONDS = 300.0
_CPP_ALLOWED_COMPILERS = frozenset({"g++", "g++.exe", "clang++", "clang++.exe"})
_SHA256_HEX_LEN = 64
_SOLVER_LIB_ENV = "SCPN_SOLVER_LIB"


def _as_contiguous_f64(array: NDArray[np.floating[Any]]) -> NDArray[np.float64]:
    """Return ``array`` as C-contiguous ``float64`` with minimal copying."""
    if isinstance(array, np.ndarray) and array.dtype == np.float64 and array.flags.c_contiguous:
        return array
    return np.ascontiguousarray(array, dtype=np.float64)


def _require_c_contiguous_f64(
    array: NDArray[np.floating[Any]],
    expected_shape: tuple[int, int],
    name: str,
) -> NDArray[np.float64]:
    """Validate that an output buffer can be written into without copying."""
    if not isinstance(array, np.ndarray):
        raise ValueError(f"{name} must be a numpy.ndarray")
    if array.dtype != np.float64:
        raise ValueError(f"{name} must have dtype float64")
    if not array.flags.c_contiguous:
        raise ValueError(f"{name} must be C-contiguous")
    if tuple(array.shape) != tuple(expected_shape):
        raise ValueError(
            f"{name} shape mismatch: expected {expected_shape}, received {tuple(array.shape)}"
        )
    return array


def _sanitize_convergence_params(
    max_iterations: int,
    tolerance: float,
    omega: float,
) -> tuple[int, float, float]:
    """Validate convergence parameters for native calls."""
    max_iters = int(max_iterations)
    if max_iters < 1:
        raise ValueError("max_iterations must be >= 1.")

    tol_safe = float(tolerance)
    if not math.isfinite(tol_safe) or tol_safe < 0.0:
        raise ValueError("tolerance must be finite and >= 0.")

    omega_safe = float(omega)
    if not math.isfinite(omega_safe) or omega_safe <= 0.0 or omega_safe >= 2.0:
        raise ValueError("omega must be finite and in (0, 2).")

    return max_iters, tol_safe, omega_safe


def _normalise_sha256(value: str) -> str:
    digest = value.strip().split()[0].lower()
    if len(digest) != _SHA256_HEX_LEN or any(c not in "0123456789abcdef" for c in digest):
        raise ValueError("trusted native library digest must be a SHA-256 hex string")
    return digest


def _sha256_file(path: Path) -> str:
    hasher = hashlib.sha256()
    with path.open("rb") as handle:
        for chunk in iter(lambda: handle.read(1024 * 1024), b""):
            hasher.update(chunk)
    return hasher.hexdigest()


def _resolve_cpp_compiler() -> Path | None:
    """Return a trusted C++ compiler path without consulting arbitrary flags."""
    compiler = shutil.which("g++", path=os.defpath)
    if compiler is None:
        logger.error("Native build requires g++ on PATH.")
        return None
    compiler_path = Path(compiler).resolve()
    if compiler_path.name not in _CPP_ALLOWED_COMPILERS:
        logger.error("Rejected unsupported C++ compiler executable: %s", compiler_path)
        return None
    if not compiler_path.is_file():
        logger.error("Resolved C++ compiler is not a regular file: %s", compiler_path)
        return None
    mode = compiler_path.stat().st_mode
    if mode & (stat.S_IWGRP | stat.S_IWOTH):
        logger.error("Rejected group/world-writable C++ compiler executable: %s", compiler_path)
        return None
    return compiler_path


def _validate_cpp_source(src: Path, script_dir: Path) -> bool:
    """Fail closed unless the build source is the bundled, regular solver file."""
    expected = script_dir / "solver.cpp"
    if src.resolve() != expected.resolve():
        logger.error("Rejected native build source outside bundled solver.cpp: %s", src)
        return False
    if src.is_symlink():
        logger.error("Rejected symlinked native build source: %s", src)
        return False
    if not src.is_file():
        logger.error("Native build source is missing or not a regular file: %s", src)
        return False
    mode = src.stat().st_mode
    if mode & (stat.S_IWGRP | stat.S_IWOTH):
        logger.error("Rejected group/world-writable native build source: %s", src)
        return False
    return True


def _cpp_build_env() -> dict[str, str]:
    """Return a minimal build environment, excluding attacker-controlled flags."""
    env: dict[str, str] = {
        "LANG": "C",
        "LC_ALL": "C",
        "PATH": os.defpath,
    }
    if platform.system() == "Windows":
        for key in ("SystemRoot", "TEMP", "TMP"):
            value = os.environ.get(key)
            if value:
                env[key] = value
    return env


def _sidecar_digest(path: Path) -> str | None:
    sidecar = path.with_suffix(path.suffix + ".sha256")
    if not sidecar.exists():
        return None
    return _normalise_sha256(sidecar.read_text(encoding="utf-8"))


def _manifest_digest(path: Path) -> str | None:
    manifest_path = os.environ.get("SCPN_SOLVER_TRUST_MANIFEST")
    if not manifest_path:
        return None
    manifest = json.loads(Path(manifest_path).read_text(encoding="utf-8"))
    if isinstance(manifest, dict) and "libraries" in manifest:
        manifest = manifest["libraries"]
    if not isinstance(manifest, dict):
        raise ValueError("native library trust manifest must be a JSON object")
    keys = (str(path), str(path.resolve()), path.name)
    for key in keys:
        if key in manifest:
            value = manifest[key]
            if not isinstance(value, str):
                raise ValueError("native library trust manifest digest must be a string")
            return _normalise_sha256(value)
    return None


def _expected_library_digest(path: Path) -> str | None:
    env_digest = os.environ.get("SCPN_SOLVER_LIB_SHA256")
    if env_digest:
        return _normalise_sha256(env_digest)
    manifest = _manifest_digest(path)
    if manifest:
        return manifest
    return _sidecar_digest(path)


def _verify_native_library_trust(path: Path) -> str:
    digest = _sha256_file(path)
    expected = _expected_library_digest(path)
    if expected is None:
        raise ValueError(
            "native solver library trust metadata is required; provide "
            "SCPN_SOLVER_LIB_SHA256, SCPN_SOLVER_TRUST_MANIFEST, or a .sha256 sidecar"
        )
    if not hmac.compare_digest(digest, expected):
        raise ValueError("native solver library SHA-256 does not match trusted metadata")
    return digest


def _require_explicit_library_path(path: str | os.PathLike[str]) -> Path:
    """Validate an explicit native-library override before ``ctypes`` loading."""
    lib_path = Path(path).expanduser()
    if not lib_path.is_absolute():
        raise ValueError(f"{_SOLVER_LIB_ENV}/lib_path must be an absolute path")
    if lib_path.is_symlink():
        raise ValueError(f"explicit native solver library must not be a symlink: {lib_path}")
    resolved = lib_path.resolve(strict=False)
    if not resolved.exists():
        raise ValueError(f"explicit native solver library does not exist: {resolved}")
    if not resolved.is_file():
        raise ValueError(f"explicit native solver library must be a regular file: {resolved}")
    return resolved


def _default_library_path(lib_name: str) -> Path:
    """Return a package-local native-library path without consulting cwd."""
    here = Path(__file__).resolve().parent
    candidates = (
        here / lib_name,
        here / "bin" / lib_name,
    )
    for candidate in candidates:
        if candidate.exists():
            return candidate.resolve()
    return (here / lib_name).resolve()


def _write_sha256_sidecar(path: Path) -> None:
    path.with_suffix(path.suffix + ".sha256").write_text(
        f"{_sha256_file(path)}  {path.name}\n",
        encoding="utf-8",
    )


[docs] class HPCBridge: """Interface between Python and the compiled C++ Grad-Shafranov solver. Loads the shared library (``libscpn_solver.so`` / ``scpn_solver.dll``) at construction time. If the library is not found the bridge gracefully degrades — :meth:`is_available` returns ``False`` and the caller falls back to Python. Parameters ---------- lib_path : str, optional Explicit path to the shared library. When *None* (default) the bridge searches trusted package-local locations only, unless ``SCPN_SOLVER_LIB`` is set explicitly. """ def __init__(self, lib_path: Optional[str] = None) -> None: self.lib: Any | None = None self.solver_ptr: int | None = None self.loaded: bool = False self.load_error: Optional[str] = None self.close_error: Optional[str] = None self.close_error_count: int = 0 self.lib_sha256: Optional[str] = None self._destroy_symbol: Optional[str] = None self._has_converged_api: bool = False self._has_boundary_api: bool = False lib_name = "scpn_solver.dll" if platform.system() == "Windows" else "libscpn_solver.so" env_path = os.environ.get(_SOLVER_LIB_ENV) explicit_override = lib_path is not None or bool(env_path) if lib_path is None and env_path: lib_path = env_path try: if explicit_override: resolved_lib_path = _require_explicit_library_path(str(lib_path)) else: resolved_lib_path = _default_library_path(lib_name) except ValueError as exc: self.lib_path = str(lib_path) self.load_error = str(exc) logger.warning("Refusing native C++ accelerator override: %s", exc) return self.lib_path = str(resolved_lib_path) try: lib_file = resolved_lib_path if lib_file.exists(): self.lib_sha256 = _verify_native_library_trust(lib_file) self.lib = ctypes.CDLL(self.lib_path) self._setup_signatures() logger.info("Loaded C++ accelerator: %s", self.lib_path) self.loaded = True except ValueError as exc: self.load_error = str(exc) logger.warning("Refusing untrusted C++ accelerator at %s: %s", self.lib_path, exc) except OSError as exc: self.load_error = str(exc) logger.debug( "C++ accelerator unavailable at %s; falling back to Python solver: %s", self.lib_path, self.load_error, )
[docs] def is_available(self) -> bool: """Return *True* if the compiled solver library was loaded.""" return self.loaded
[docs] def close(self) -> None: """Release the C++ solver instance, if one was created.""" if self.solver_ptr is not None and self.loaded: try: if self.lib is not None and self._destroy_symbol is not None: getattr(self.lib, self._destroy_symbol)(self.solver_ptr) except Exception as exc: self.close_error = str(exc) self.close_error_count = int(getattr(self, "close_error_count", 0)) + 1 logger.warning( "Failed to release C++ solver instance cleanly: %s", self.close_error ) self.solver_ptr = None
def __del__(self) -> None: """Release the native solver handle on garbage collection.""" self.close() def __enter__(self) -> "HPCBridge": """Enter the context manager and return this bridge.""" return self def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, traceback: TracebackType | None, ) -> None: """Release the native solver handle on context exit.""" self.close() def _setup_signatures(self) -> None: if self.lib is None: return # void* create_solver(int nr, int nz, double rmin, double rmax, double zmin, double zmax) self.lib.create_solver.argtypes = [ ctypes.c_int, ctypes.c_int, ctypes.c_double, ctypes.c_double, ctypes.c_double, ctypes.c_double, ] self.lib.create_solver.restype = ctypes.c_void_p # void run_step(void* solver, double* j, double* psi, int size, int iter) self.lib.run_step.argtypes = [ ctypes.c_void_p, np.ctypeslib.ndpointer(dtype=np.float64, flags="C_CONTIGUOUS"), np.ctypeslib.ndpointer(dtype=np.float64, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, ] # int run_step_converged(void* solver, const double* j, double* psi, # int size, int max_iter, double omega, # double tol, double* final_delta) if hasattr(self.lib, "run_step_converged"): self.lib.run_step_converged.argtypes = [ ctypes.c_void_p, np.ctypeslib.ndpointer(dtype=np.float64, flags="C_CONTIGUOUS"), np.ctypeslib.ndpointer(dtype=np.float64, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, ctypes.c_double, ctypes.c_double, ctypes.POINTER(ctypes.c_double), ] self.lib.run_step_converged.restype = ctypes.c_int self._has_converged_api = True else: self._has_converged_api = False # void set_boundary_dirichlet(void* solver, double boundary_value) if hasattr(self.lib, "set_boundary_dirichlet"): self.lib.set_boundary_dirichlet.argtypes = [ctypes.c_void_p, ctypes.c_double] self.lib.set_boundary_dirichlet.restype = None self._has_boundary_api = True else: self._has_boundary_api = False # void destroy_solver(void* solver) or void delete_solver(void* solver) if hasattr(self.lib, "destroy_solver"): self.lib.destroy_solver.argtypes = [ctypes.c_void_p] self.lib.destroy_solver.restype = None self._destroy_symbol = "destroy_solver" elif hasattr(self.lib, "delete_solver"): self.lib.delete_solver.argtypes = [ctypes.c_void_p] self.lib.delete_solver.restype = None self._destroy_symbol = "delete_solver" else: self._destroy_symbol = None
[docs] def initialize( self, nr: int, nz: int, r_range: tuple[float, float], z_range: tuple[float, float], boundary_value: float = 0.0, ) -> None: """Create the C++ solver instance for the given grid dimensions.""" if not self.loaded: return if nr < 2 or nz < 2: raise ValueError("nr and nz must be >= 2.") if r_range[0] >= r_range[1] or z_range[0] >= z_range[1]: raise ValueError("r_range/z_range must have min < max.") if self.lib is None: return self.nr = nr self.nz = nz self.solver_ptr = self.lib.create_solver( nr, nz, r_range[0], r_range[1], z_range[0], z_range[1] ) self.set_boundary_dirichlet(boundary_value)
[docs] def set_boundary_dirichlet(self, boundary_value: float = 0.0) -> None: """Set a fixed Dirichlet boundary value for psi edges, if supported.""" if ( not self.loaded or self.solver_ptr is None or self.lib is None or not self._has_boundary_api ): return self.lib.set_boundary_dirichlet(self.solver_ptr, float(boundary_value))
[docs] def solve( self, j_phi: NDArray[np.float64], iterations: int = 100, ) -> Optional[NDArray[np.float64]]: """Run the C++ solver for *iterations* sweeps. Returns *None* if the library is not loaded (caller should fall back to a Python solver). """ prepared = self._prepare_inputs(j_phi) if prepared is None: return None _, expected_shape = prepared psi_out = np.zeros(expected_shape, dtype=np.float64) solved = self.solve_into(j_phi, psi_out, iterations=iterations) if solved is None: return None return solved
[docs] def solve_into( self, j_phi: NDArray[np.float64], psi_out: NDArray[np.float64], iterations: int = 100, ) -> Optional[NDArray[np.float64]]: """Run the C++ solver and write results into ``psi_out`` in-place.""" prepared = self._prepare_inputs(j_phi) if prepared is None: return None j_input, expected_shape = prepared psi_target = _require_c_contiguous_f64(psi_out, expected_shape, "psi_out") if self.lib is None: return None self.lib.run_step( self.solver_ptr, j_input, psi_target, int(j_input.size), int(iterations), ) return psi_target
[docs] def solve_neural( self, config_path: Optional[str | Path] = None ) -> Optional[NDArray[np.float64]]: """Run the O(1) Neural Equilibrium Surrogate. Requires NeuralEquilibriumKernel (JAX/NPZ weights). """ try: from scpn_fusion.core.neural_equilibrium_kernel import NeuralEquilibriumKernel except ImportError: logger.warning("NeuralEquilibriumKernel not available (ImportError).") return None try: # Note: NeuralEquilibriumKernel needs a config for grid sizing # default to iter_config.json in root if not provided if config_path is None: config_path = Path(__file__).resolve().parents[3] / "iter_config.json" kernel = NeuralEquilibriumKernel(config_path) res = kernel.solve_equilibrium() psi = res.get("Psi") if psi is None: return None return np.asarray(psi, dtype=np.float64) except Exception as exc: logger.error("Neural surrogate inference failed: %s", exc) return None
[docs] def solve_until_converged( self, j_phi: NDArray[np.float64], max_iterations: int = 1000, tolerance: float = 1e-6, omega: float = 1.8, ) -> Optional[tuple[NDArray[np.float64], int, float]]: """Run solver until convergence, if native API is available. Returns ``(psi, iterations_used, final_delta)``. If the library is unavailable or uninitialized, returns ``None``. """ prepared = self._prepare_inputs(j_phi) if prepared is None: return None _, expected_shape = prepared psi_out = np.zeros(expected_shape, dtype=np.float64) converged = self.solve_until_converged_into( j_phi, psi_out, max_iterations=max_iterations, tolerance=tolerance, omega=omega, ) if converged is None: return None iterations_used, final_delta = converged return psi_out, iterations_used, final_delta
[docs] def solve_until_converged_into( self, j_phi: NDArray[np.float64], psi_out: NDArray[np.float64], max_iterations: int = 1000, tolerance: float = 1e-6, omega: float = 1.8, ) -> Optional[tuple[int, float]]: """Run convergence API and write results into ``psi_out`` in-place.""" prepared = self._prepare_inputs(j_phi) if prepared is None: return None j_input, expected_shape = prepared psi_target = _require_c_contiguous_f64(psi_out, expected_shape, "psi_out") if self.lib is None: return None max_iters, tol_safe, omega_safe = _sanitize_convergence_params( max_iterations, tolerance, omega ) if not self._has_converged_api: self.lib.run_step( self.solver_ptr, j_input, psi_target, int(j_input.size), int(max_iters), ) return int(max_iters), float("nan") final_delta = ctypes.c_double(0.0) iterations_used = int( self.lib.run_step_converged( self.solver_ptr, j_input, psi_target, int(j_input.size), int(max_iters), float(omega_safe), float(tol_safe), ctypes.byref(final_delta), ) ) return iterations_used, float(final_delta.value)
def _prepare_inputs( self, j_phi: NDArray[np.float64] ) -> Optional[tuple[NDArray[np.float64], tuple[int, int]]]: if not self.loaded or self.solver_ptr is None: return None j_input = _as_contiguous_f64(j_phi) if j_input.ndim != 2: raise ValueError(f"j_phi must be a 2D array, received ndim={j_input.ndim}") if j_input.size == 0: raise ValueError("j_phi must be non-empty") if not np.all(np.isfinite(j_input)): raise ValueError("j_phi must contain only finite values") expected_shape = ( getattr(self, "nz", j_input.shape[0]), getattr(self, "nr", j_input.shape[-1]), ) if tuple(j_input.shape) != tuple(expected_shape): raise ValueError( f"j_phi shape mismatch: expected {expected_shape}, received {tuple(j_input.shape)}" ) return j_input, expected_shape
[docs] def compile_cpp() -> Optional[str]: """Compile the C++ solver from source. Looks for the bundled ``solver.cpp`` in the same directory as this module, resolves an allowlisted C++ compiler, and invokes it with fixed arguments and a minimal environment. The build is intentionally opt-in because local native compilation is code execution. Returns ------- str or None Path to the compiled library, or *None* on failure. """ if os.environ.get("SCPN_ALLOW_NATIVE_BUILD") != "1": logger.warning("Native build disabled. Set SCPN_ALLOW_NATIVE_BUILD=1 to enable.") return None script_dir = Path(__file__).resolve().parent src = script_dir / "solver.cpp" if not _validate_cpp_source(src, script_dir): return None compiler = _resolve_cpp_compiler() if compiler is None: return None logger.info("Compiling C++ solver kernel from trusted bundled source.") out_dir = script_dir / "bin" out_dir.mkdir(exist_ok=True) if platform.system() == "Windows": out = out_dir / "scpn_solver.dll" cmd = [str(compiler), "-shared", "-o", str(out), str(src), "-O3", "-mavx2"] else: out = out_dir / "libscpn_solver.so" cmd = [ str(compiler), "-shared", "-fPIC", "-o", str(out), str(src), "-O3", "-march=native", ] logger.info("Executing trusted native build command: %s", " ".join(cmd)) try: subprocess.run( cmd, check=True, timeout=_CPP_BUILD_TIMEOUT_SECONDS, env=_cpp_build_env(), ) except subprocess.TimeoutExpired as exc: logger.error( "Compilation timed out after %.1fs: %s", _CPP_BUILD_TIMEOUT_SECONDS, exc, ) return None except (subprocess.CalledProcessError, FileNotFoundError, OSError) as exc: logger.error("Compilation failed: %s", exc) return None logger.info("Compilation succeeded: %s", out) _write_sha256_sidecar(out) return str(out)
if __name__ == "__main__": # Test sequence lib_file = compile_cpp() if lib_file: bridge = HPCBridge(lib_file) # Test Grid N = 100 bridge.initialize(N, N, (2.0, 10.0), (-5.0, 5.0)) # Dummy Current (Gaussian) J = np.random.rand(N, N) # Run Psi = bridge.solve(J, iterations=500) if Psi is not None: print(f"Max Flux: {np.max(Psi)}")