Source code for scpn_fusion.control.neuro_cybernetic_controller

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Neuro Cybernetic Controller
from __future__ import annotations

import logging
import math
import sys
from collections import deque
from pathlib import Path
from typing import Any, Callable, Dict, Optional

import matplotlib.pyplot as plt
import numpy as np
from scpn_fusion.scpn.safety_interlocks import SafetyInterlockRuntime
from scpn_fusion.neurocore_compat import (
    SC_NEUROCORE_AVAILABLE,
    QuantumEntropySource,
    StochasticLIFNeuron,
)

logger = logging.getLogger(__name__)

SHOT_DURATION = 100
TARGET_R = 6.2
TARGET_Z = 0.0


def _resolve_fusion_kernel() -> Any:
    """Resolve FusionKernel lazily to keep pool-only paths dependency-light."""
    try:
        from scpn_fusion.core._rust_compat import FusionKernel as _FusionKernel

        return _FusionKernel
    except Exception:
        try:
            from scpn_fusion.core.fusion_kernel import FusionKernel as _FusionKernel

            return _FusionKernel
        except Exception as exc:  # pragma: no cover - import-guard path
            raise ImportError(
                "Unable to import FusionKernel. Run with PYTHONPATH=src "
                "or use `python -m scpn_fusion.control.neuro_cybernetic_controller`."
            ) from exc


[docs] class SpikingControllerPool: """ Push-pull spiking control population with deterministic compatibility path. Preferred backend is ``sc-neurocore``. If unavailable, a reduced NumPy LIF population is used so controller workflows remain executable in CI. """ def __init__( self, n_neurons: int = 20, gain: float = 1.0, tau_window: int = 10, use_quantum: bool = False, *, seed: int = 42, allow_numpy_fallback: bool = True, dt_s: float = 1.0e-3, tau_mem_s: float = 15.0e-3, noise_std: float = 0.02, ) -> None: n_neurons = int(n_neurons) if n_neurons < 1: raise ValueError("n_neurons must be >= 1.") gain = float(gain) if not math.isfinite(gain): raise ValueError("gain must be finite.") tau_window = int(tau_window) if tau_window < 1: raise ValueError("tau_window must be >= 1.") dt_s = float(dt_s) if not math.isfinite(dt_s) or dt_s <= 0.0: raise ValueError("dt_s must be finite and > 0.") tau_mem_s = float(tau_mem_s) if not math.isfinite(tau_mem_s) or tau_mem_s <= 0.0: raise ValueError("tau_mem_s must be finite and > 0.") noise_std = float(noise_std) if not math.isfinite(noise_std) or noise_std < 0.0: raise ValueError("noise_std must be finite and >= 0.") self.n_neurons = n_neurons self.gain = gain self.window_size = tau_window self.use_quantum = bool(use_quantum) self._i_scale = 5.0 self._i_bias = 0.1 self.last_rate_pos = 0.0 self.last_rate_neg = 0.0 self.history_pos: deque[int] = deque(maxlen=self.window_size) self.history_neg: deque[int] = deque(maxlen=self.window_size) for _ in range(self.window_size): self.history_pos.append(0) self.history_neg.append(0) if SC_NEUROCORE_AVAILABLE: self.backend = "sc_neurocore" self.q_source = ( QuantumEntropySource(n_qubits=4) if self.use_quantum and QuantumEntropySource is not None else None ) self.pop_pos = [ StochasticLIFNeuron(seed=i, entropy_source=self.q_source) for i in range(self.n_neurons) ] self.pop_neg = [ StochasticLIFNeuron(seed=i + 1000, entropy_source=self.q_source) for i in range(self.n_neurons) ] self._v_pos = None self._v_neg = None self._rng_pos = None self._rng_neg = None self._alpha = 0.0 self._noise_std = 0.0 self._v_threshold = 1.0 self._v_reset = 0.0 return if not allow_numpy_fallback: raise RuntimeError("sc-neurocore is unavailable and allow_numpy_fallback=False.") self.backend = "numpy_lif" self.q_source = None self.pop_pos = [] self.pop_neg = [] self._rng_pos = np.random.default_rng(int(seed)) self._rng_neg = np.random.default_rng(int(seed) + 100003) self._v_pos = np.zeros(self.n_neurons, dtype=np.float64) self._v_neg = np.zeros(self.n_neurons, dtype=np.float64) self._alpha = dt_s / tau_mem_s self._noise_std = noise_std # Reduced threshold keeps compatibility lane responsive in low-current control # regimes while preserving deterministic push-pull polarity. self._v_threshold = 0.35 self._v_reset = 0.0 def _step_numpy_population( self, v: np.ndarray, rng: np.random.Generator, input_current: float, ) -> int: noise = rng.normal(0.0, self._noise_std, size=v.shape) v += self._alpha * (-v + float(input_current) + noise) fired = v >= self._v_threshold n_fired = int(np.count_nonzero(fired)) if n_fired > 0: v[fired] = self._v_reset return n_fired
[docs] def step(self, error_signal: float) -> float: input_pos = max(0.0, float(error_signal)) * self._i_scale input_neg = max(0.0, -float(error_signal)) * self._i_scale if self.backend == "sc_neurocore": spikes_pos = 0 for neuron in self.pop_pos: if neuron.step(self._i_bias + input_pos): spikes_pos += 1 spikes_neg = 0 for neuron in self.pop_neg: if neuron.step(self._i_bias + input_neg): spikes_neg += 1 else: spikes_pos = self._step_numpy_population( self._v_pos, self._rng_pos, self._i_bias + input_pos ) spikes_neg = self._step_numpy_population( self._v_neg, self._rng_neg, self._i_bias + input_neg ) self.history_pos.append(spikes_pos) self.history_neg.append(spikes_neg) self.last_rate_pos = float(sum(self.history_pos) / (self.window_size * self.n_neurons)) self.last_rate_neg = float(sum(self.history_neg) / (self.window_size * self.n_neurons)) return float((self.last_rate_pos - self.last_rate_neg) * self.gain)
[docs] class NeuroCyberneticController: """ Replaces PID loops with push-pull spiking populations. """ def __init__( self, config_file: str, seed: int = 42, *, shot_duration: int = SHOT_DURATION, kernel_factory: Optional[Callable[[str], Any]] = None, ) -> None: if int(shot_duration) <= 0: raise ValueError("shot_duration must be > 0") fusion_kernel_cls = ( kernel_factory if kernel_factory is not None else _resolve_fusion_kernel() ) self.kernel = fusion_kernel_cls(config_file) self.seed = int(seed) self.shot_duration = int(shot_duration) self.history: Dict[str, list[float]] = {} self.brain_R: Optional[SpikingControllerPool] = None self.brain_Z: Optional[SpikingControllerPool] = None self.safety_runtime = SafetyInterlockRuntime() self._reset_history() def _reset_history(self) -> None: self.history = { "t": [], "Ip": [], "R_axis": [], "Z_axis": [], "Err_R": [], "Err_Z": [], "Control_R": [], "Control_Z": [], "Spike_Rates": [], "Safety_Position_Allowed": [], "Safety_Contract_Violations": [], } def _check_safety(self, state: Dict[str, float]) -> Dict[str, bool]: allowed = self.safety_runtime.update_from_state(state) return allowed
[docs] def initialize_brains(self, use_quantum: bool = False) -> None: self.brain_R = SpikingControllerPool( n_neurons=50, gain=10.0, tau_window=20, use_quantum=use_quantum, seed=self.seed + 1, ) self.brain_Z = SpikingControllerPool( n_neurons=50, gain=20.0, tau_window=20, use_quantum=use_quantum, seed=self.seed + 2, )
[docs] def run_shot( self, *, save_plot: bool = True, verbose: bool = True, output_path: Optional[str] = None, ) -> Dict[str, Any]: self.initialize_brains(use_quantum=False) return self._execute_simulation( "Neuro-Cybernetic (Classical SNN)", mode="classical", save_plot=save_plot, verbose=verbose, output_path=output_path, )
[docs] def run_quantum_shot( self, *, save_plot: bool = True, verbose: bool = True, output_path: Optional[str] = None, ) -> Dict[str, Any]: self.initialize_brains(use_quantum=True) return self._execute_simulation( "Quantum-Neuro Hybrid (QNN)", mode="quantum", save_plot=save_plot, verbose=verbose, output_path=output_path, )
def _execute_simulation( self, title: str, *, mode: str, save_plot: bool, verbose: bool, output_path: Optional[str], ) -> Dict[str, Any]: if self.brain_R is None or self.brain_Z is None: raise RuntimeError("Controller brains are not initialized.") if verbose: logger.info("--- %s PLASMA INTERFACE ---", title.upper()) logger.info("Initializing Stochastic Neural Network (SNN)...") logger.info("Neurons: %d (Push-Pull Configuration)", self.brain_R.n_neurons * 4) self._reset_history() self.kernel.solve_equilibrium() physics_cfg = self.kernel.cfg.setdefault("physics", {}) coils = self.kernel.cfg.setdefault("coils", [{} for _ in range(5)]) while len(coils) < 5: coils.append({}) for coil in coils: coil.setdefault("current", 0.0) prev_z: Optional[float] = None def _mean_profile_or_zero(value: Any) -> float: try: arr = np.asarray(value, dtype=np.float64) except Exception: return 0.0 if arr.size == 0: return 0.0 arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) return float(np.mean(arr)) for t in range(self.shot_duration): target_ip = 5.0 + (10.0 * t / self.shot_duration) physics_cfg["plasma_current_target"] = target_ip idx_max = int(np.argmax(self.kernel.Psi)) iz, ir = np.unravel_index(idx_max, self.kernel.Psi.shape) curr_r = float(self.kernel.R[ir]) curr_z = float(self.kernel.Z[iz]) err_r = TARGET_R - curr_r err_z = TARGET_Z - curr_z ctrl_r = float(self.brain_R.step(err_r)) ctrl_z = float(self.brain_Z.step(err_z)) dz_dt = 0.0 if prev_z is None else float(curr_z - prev_z) prev_z = curr_z state = { "T_e": _mean_profile_or_zero(getattr(self.kernel, "Te", np.zeros(1))), "n_e": _mean_profile_or_zero(getattr(self.kernel, "ne", np.zeros(1))), "beta_N": float(physics_cfg.get("beta_N", physics_cfg.get("beta_n", 0.0))), "I_p": float(target_ip), "dZ_dt": float(dz_dt), } allowed = self._check_safety(state) if ( not allowed.get("heat_ramp", True) or not allowed.get("power_ramp", True) or not allowed.get("current_ramp", True) ): ctrl_r = 0.0 if not allowed.get("position_move", True): ctrl_z = 0.0 coils[2]["current"] = float(coils[2]["current"]) + ctrl_r coils[0]["current"] = float(coils[0]["current"]) - ctrl_z coils[4]["current"] = float(coils[4]["current"]) + ctrl_z self.kernel.solve_equilibrium() self.history["t"].append(float(t)) self.history["Ip"].append(float(target_ip)) self.history["R_axis"].append(curr_r) self.history["Z_axis"].append(curr_z) self.history["Err_R"].append(float(err_r)) self.history["Err_Z"].append(float(err_z)) self.history["Control_R"].append(ctrl_r) self.history["Control_Z"].append(ctrl_z) self.history["Spike_Rates"].append( float(self.brain_R.last_rate_pos - self.brain_R.last_rate_neg) ) self.history["Safety_Position_Allowed"].append( 1.0 if allowed.get("position_move", True) else 0.0 ) self.history["Safety_Contract_Violations"].append( float(len(self.safety_runtime.last_contract_violations)) ) if verbose: logger.info( "T=%d: Pos=(%.2f, %.2f) | Err=(%.3f, %.3f) | Brain_Out=(%.3f, %.3f)", t, curr_r, curr_z, err_r, err_z, ctrl_r, ctrl_z, ) plot_saved = False plot_error: Optional[str] = None if save_plot: try: self.visualize(title, output_path=output_path, verbose=verbose) plot_saved = True except Exception as exc: plot_error = str(exc) if verbose: logger.warning("Plot export skipped due to error: %s", exc) err_r = np.asarray(self.history["Err_R"], dtype=np.float64) err_z = np.asarray(self.history["Err_Z"], dtype=np.float64) ctrl_r = np.asarray(self.history["Control_R"], dtype=np.float64) ctrl_z = np.asarray(self.history["Control_Z"], dtype=np.float64) safety_position_allowed = np.asarray( self.history["Safety_Position_Allowed"], dtype=np.float64 ) safety_contract_violations = np.asarray( self.history["Safety_Contract_Violations"], dtype=np.float64 ) summary: Dict[str, Any] = { "seed": self.seed, "steps": int(self.shot_duration), "mode": str(mode), "backend_r": self.brain_R.backend, "backend_z": self.brain_Z.backend, "final_r": float(self.history["R_axis"][-1]), "final_z": float(self.history["Z_axis"][-1]), "mean_abs_err_r": float(np.mean(np.abs(err_r))), "mean_abs_err_z": float(np.mean(np.abs(err_z))), "max_abs_control_r": float(np.max(np.abs(ctrl_r))), "max_abs_control_z": float(np.max(np.abs(ctrl_z))), "mean_spike_imbalance": float(np.mean(self.history["Spike_Rates"])), "safety_position_allow_rate": float( np.mean(safety_position_allowed) if safety_position_allowed.size else 1.0 ), "safety_interlock_trips": int(np.count_nonzero(safety_position_allowed < 0.5)), "safety_contract_violations": int(np.sum(safety_contract_violations)), "plot_saved": bool(plot_saved), "plot_error": plot_error, } return summary
[docs] def visualize( self, title: str, *, output_path: Optional[str] = None, verbose: bool = True, ) -> str: fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8)) ax1.set_title(f"{title} Control") ax1.plot(self.history["t"], self.history["R_axis"], "b-", label="R (Radial)") ax1.plot(self.history["t"], self.history["Z_axis"], "r-", label="Z (Vertical)") ax1.axhline(TARGET_R, color="b", linestyle="--", alpha=0.3) ax1.axhline(TARGET_Z, color="r", linestyle="--", alpha=0.3) ax1.set_ylabel("Position (m)") ax1.legend() ax1.grid(True) ax2.set_title("Neural Control Activity") ax2.plot(self.history["t"], self.history["Control_R"], "k-", label="Radial Command") ax2.set_ylabel("Current Delta (A)") ax2.set_xlabel("Time Step") ax2.legend() filename = ( output_path if output_path is not None else f"{title.replace(' ', '_')}_Result.png" ) plt.tight_layout() plt.savefig(filename) plt.close(fig) if verbose: logger.info("Analysis saved: %s", filename) return filename
[docs] def run_neuro_cybernetic_control( *, config_file: str, shot_duration: int = SHOT_DURATION, seed: int = 42, quantum: bool = False, save_plot: bool = False, verbose: bool = False, output_path: Optional[str] = None, kernel_factory: Optional[Callable[[str], Any]] = None, ) -> Dict[str, Any]: """Run neuro-cybernetic control in deterministic non-interactive mode.""" controller = NeuroCyberneticController( config_file, seed=seed, shot_duration=shot_duration, kernel_factory=kernel_factory, ) if quantum: return controller.run_quantum_shot( save_plot=save_plot, verbose=verbose, output_path=output_path ) return controller.run_shot(save_plot=save_plot, verbose=verbose, output_path=output_path)
if __name__ == "__main__": repo_root = Path(__file__).resolve().parents[3] cfg = repo_root / "iter_config.json" nc = NeuroCyberneticController(str(cfg)) if len(sys.argv) > 1 and sys.argv[1] == "quantum": nc.run_quantum_shot() else: nc.run_shot()