Source code for scpn_fusion.diagnostics.run_diagnostics

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Run Diagnostics
from __future__ import annotations

import argparse
import inspect
from pathlib import Path
from typing import Any, Callable, Dict, Optional

import matplotlib.pyplot as plt
import numpy as np

try:
    from scpn_fusion.core._rust_compat import FusionKernel
except ImportError:
    from scpn_fusion.core.fusion_kernel import FusionKernel
from scpn_fusion.diagnostics.synthetic_sensors import SensorSuite
from scpn_fusion.diagnostics.tomography import PlasmaTomography

ROOT = Path(__file__).resolve().parents[3]
DEFAULT_CONFIG_PATH = ROOT / "validation" / "iter_validated_config.json"
DEFAULT_OUTPUT_DIR = ROOT / "artifacts" / "diagnostics_demo"


def _build_sensor_suite(
    sensor_factory: Callable[[Any], Any],
    kernel: Any,
    *,
    seed: int,
    rng: np.random.Generator,
) -> Any:
    """Instantiate sensor suite, passing scoped RNG/seed when supported."""
    try:
        params = inspect.signature(sensor_factory).parameters
    except (TypeError, ValueError):
        params = {}

    if "rng" in params:
        return sensor_factory(kernel, rng=rng)
    if "seed" in params:
        return sensor_factory(kernel, seed=seed)

    try:
        return sensor_factory(kernel, rng=rng)
    except TypeError:
        try:
            return sensor_factory(kernel, seed=seed)
        except TypeError:
            return sensor_factory(kernel)


[docs] def run_diag_demo( config_path: Path | str = DEFAULT_CONFIG_PATH, output_dir: Path | str = DEFAULT_OUTPUT_DIR, *, seed: int = 42, save_figures: bool = True, verbose: bool = True, kernel_factory: Callable[[str], Any] = FusionKernel, sensor_factory: Callable[[Any], Any] = SensorSuite, tomography_factory: Callable[[Any], Any] = PlasmaTomography, ) -> Dict[str, Any]: """ Run synthetic diagnostics + tomography and return deterministic summary. """ cfg = Path(config_path) out_dir = Path(output_dir) seed_int = int(seed) rng = np.random.default_rng(seed_int) if verbose: print("--- SCPN SYNTHETIC DIAGNOSTICS & TOMOGRAPHY ---") kernel = kernel_factory(str(cfg)) if hasattr(kernel, "solve_equilibrium"): kernel.solve_equilibrium() psi = np.asarray(kernel.Psi, dtype=np.float64) if psi.size == 0: raise ValueError("Kernel Psi grid is empty.") idx_max = int(np.argmax(psi)) iz_ax, ir_ax = np.unravel_index(idx_max, psi.shape) psi_ax = float(psi[iz_ax, ir_ax]) if abs(psi_ax) < 1e-12: psi_ax = 1.0 phantom = np.clip((psi / psi_ax) ** 2, 0.0, None) hot_iz = int(np.clip(int(0.65 * (phantom.shape[0] - 1)), 0, phantom.shape[0] - 1)) hot_ir = int(np.clip(int(0.35 * (phantom.shape[1] - 1)), 0, phantom.shape[1] - 1)) phantom[hot_iz, hot_ir] += 0.5 sensors = _build_sensor_suite(sensor_factory, kernel, seed=seed_int, rng=rng) if verbose: print("Measuring Signals...") mag_signals = np.asarray(sensors.measure_magnetics(), dtype=np.float64) bolo_signals = np.asarray(sensors.measure_bolometer(phantom), dtype=np.float64) if verbose: print(f" Magnetic Probes: {int(mag_signals.size)} channels") print(f" Bolometer Cameras: {int(bolo_signals.size)} channels") print("Running Tomographic Inversion...") tomo = tomography_factory(sensors) reconstruction = np.asarray(tomo.reconstruct(bolo_signals), dtype=np.float64) flat_phantom = phantom.reshape(-1) flat_recon = reconstruction.reshape(-1) n = min(flat_phantom.size, flat_recon.size) rmse = float(np.sqrt(np.mean((flat_phantom[:n] - flat_recon[:n]) ** 2))) if n > 0 else 0.0 plot_saved = False plot_error: Optional[str] = None tomo_path: Optional[str] = None geom_path: Optional[str] = None if save_figures: try: out_dir.mkdir(parents=True, exist_ok=True) fig = tomo.plot_reconstruction(phantom, reconstruction) tomo_out = out_dir / "Tomography_Result.png" fig.savefig(str(tomo_out)) plt.close(fig) fig2 = sensors.visualize_setup() geom_out = out_dir / "Sensor_Geometry.png" fig2.savefig(str(geom_out)) plt.close(fig2) tomo_path = str(tomo_out) geom_path = str(geom_out) plot_saved = bool(tomo_out.exists() and geom_out.exists()) if verbose: print(f"Saved: {tomo_out}") print(f"Saved: {geom_out}") except Exception as exc: plot_error = f"{exc.__class__.__name__}: {exc}" return { "seed": seed_int, "config_path": str(cfg), "mag_channels": int(mag_signals.size), "bolo_channels": int(bolo_signals.size), "phantom_sum": float(np.sum(phantom)), "reconstruction_sum": float(np.sum(reconstruction)), "reconstruction_rmse": rmse, "plot_saved": bool(plot_saved), "plot_error": plot_error, "tomography_path": tomo_path, "sensor_geometry_path": geom_path, }
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run synthetic diagnostics and tomography demo.") parser.add_argument( "--config", type=Path, default=DEFAULT_CONFIG_PATH, help="Path to reactor configuration JSON.", ) parser.add_argument( "--output-dir", type=Path, default=DEFAULT_OUTPUT_DIR, help="Directory for generated diagnostic figures.", ) parser.add_argument( "--seed", type=int, default=42, help="Random seed for deterministic sensor noise/reconstruction runs.", ) parser.add_argument( "--no-figures", action="store_true", help="Run diagnostics without writing PNG outputs.", ) args = parser.parse_args() run_diag_demo( config_path=args.config, output_dir=args.output_dir, seed=int(args.seed), save_figures=not bool(args.no_figures), )