Source code for scpn_fusion.control.advanced_soc_fusion_learning

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Advanced SOC Fusion Learning
from __future__ import annotations

import logging
from typing import Any, Dict, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np

logger = logging.getLogger(__name__)

L = 60
TIME_STEPS = 10000
Z_CRIT_BASE = 6.0
FLOW_GENERATION = 0.2
FLOW_DAMPING = 0.05
SHEAR_EFFICIENCY = 3.0

ALPHA = 0.1
GAMMA = 0.95
EPSILON = 0.1
N_STATES_TURB = 5
N_STATES_FLOW = 5
N_ACTIONS = 3


def _normalize_bounds(bounds: Tuple[float, float], name: str) -> Tuple[float, float]:
    lo = float(bounds[0])
    hi = float(bounds[1])
    if not np.isfinite(lo) or not np.isfinite(hi) or lo >= hi:
        raise ValueError(f"{name} must be finite with lower < upper.")
    return lo, hi


[docs] class CoupledSandpileReactor: """ Predator-prey sandpile approximation for turbulence/flow coupling. """ def __init__( self, size: int = L, *, z_crit_base: float = Z_CRIT_BASE, flow_generation: float = FLOW_GENERATION, flow_damping: float = FLOW_DAMPING, shear_efficiency: float = SHEAR_EFFICIENCY, max_sub_steps: int = 50, flow_bounds: Tuple[float, float] = (0.0, 5.0), energy_per_topple_mj: float = 0.05, # Physical calibration ) -> None: size = int(size) if size < 8: raise ValueError("size must be >= 8.") self.size = size self.z_crit_base = float(z_crit_base) self.energy_per_topple_mj = float(energy_per_topple_mj) flow_generation = float(flow_generation) if not np.isfinite(flow_generation) or flow_generation < 0.0: raise ValueError("flow_generation must be finite and >= 0.") flow_damping = float(flow_damping) if not np.isfinite(flow_damping) or flow_damping < 0.0 or flow_damping >= 1.0: raise ValueError("flow_damping must be finite and in [0, 1).") shear_efficiency = float(shear_efficiency) if not np.isfinite(shear_efficiency) or shear_efficiency < 0.0: raise ValueError("shear_efficiency must be finite and >= 0.") max_sub_steps = int(max_sub_steps) if max_sub_steps < 1: raise ValueError("max_sub_steps must be >= 1.") self.flow_generation = flow_generation self.flow_damping = flow_damping self.shear_efficiency = shear_efficiency self.max_sub_steps = max_sub_steps self.flow_bounds = _normalize_bounds(flow_bounds, "flow_bounds") self.Z = np.zeros(self.size, dtype=np.float64) self.h = np.zeros(self.size, dtype=np.float64) self.flow = 0.0
[docs] def drive(self, amount: float = 1.0) -> None: self.Z[0] += max(float(amount), 0.0)
[docs] def step_physics(self, external_shear: float) -> tuple[int, float, float]: eff_shear = float(self.flow + float(external_shear)) current_z_crit = float(self.z_crit_base + self.shear_efficiency * eff_shear) total_topple = 0 for _ in range(self.max_sub_steps): sites_active = np.where(current_z_crit <= self.Z)[0] if sites_active.size == 0: break for i in sites_active: self.Z[i] -= 2.0 if i + 1 < self.size: self.Z[i + 1] += 1.0 if i - 1 >= 0: self.Z[i - 1] += 1.0 total_topple += 1 self.flow += (float(total_topple) * self.flow_generation) / float(self.size) self.flow *= 1.0 - self.flow_damping lo, hi = self.flow_bounds self.flow = float(np.clip(self.flow, lo, hi)) return int(total_topple), float(self.flow), eff_shear
[docs] def get_profile_energy(self) -> float: self.h = np.cumsum(self.Z[::-1])[::-1] return float(self.h[0] if self.h.size else 0.0)
[docs] def get_elm_energy_mj(self, topple_count: int) -> float: """Calculate real energy release from topple count.""" return float(topple_count) * self.energy_per_topple_mj
[docs] class FusionAIAgent: """ Tabular Q-learning controller on discretized turbulence/flow states. """ def __init__( self, *, alpha: float = ALPHA, gamma: float = GAMMA, epsilon: float = EPSILON, n_states_turb: int = N_STATES_TURB, n_states_flow: int = N_STATES_FLOW, n_actions: int = N_ACTIONS, entropy_beta: float = 0.05, ) -> None: alpha = float(alpha) if not np.isfinite(alpha) or alpha < 0.0 or alpha > 1.0: raise ValueError("alpha must be finite and in [0, 1].") gamma = float(gamma) if not np.isfinite(gamma) or gamma < 0.0 or gamma > 1.0: raise ValueError("gamma must be finite and in [0, 1].") epsilon = float(epsilon) if not np.isfinite(epsilon) or epsilon < 0.0 or epsilon > 1.0: raise ValueError("epsilon must be finite and in [0, 1].") n_states_turb = int(n_states_turb) if n_states_turb < 1: raise ValueError("n_states_turb must be >= 1.") n_states_flow = int(n_states_flow) if n_states_flow < 1: raise ValueError("n_states_flow must be >= 1.") n_actions = int(n_actions) if n_actions < 1: raise ValueError("n_actions must be >= 1.") self.alpha = alpha self.gamma = gamma self.epsilon = epsilon self.entropy_beta = float(entropy_beta) self.n_states_turb = n_states_turb self.n_states_flow = n_states_flow self.n_actions = n_actions self.q_table = np.zeros( (self.n_states_turb, self.n_states_flow, self.n_actions), dtype=np.float64, ) self.total_reward = 0.0
[docs] def discretize_state(self, turb: float, flow: float) -> tuple[int, int]: s_turb = min(int(np.log1p(max(float(turb), 0.0))), self.n_states_turb - 1) s_flow = min(int(max(float(flow), 0.0)), self.n_states_flow - 1) return s_turb, s_flow
[docs] def choose_action( self, state: tuple[int, int], rng: np.random.Generator, ) -> int: if float(rng.random()) < self.epsilon: return int(rng.integers(self.n_actions)) # Softmax selection or Argmax # Here we use Argmax for simplicity but the Q-values are regularized return int(np.argmax(self.q_table[state]))
[docs] def learn( self, state: tuple[int, int], action: int, new_state: tuple[int, int], reward: float, ) -> float: """ Soft-Q learning update with entropy regularization. Q(s,a) = R + gamma * [ max Q(s',a') + beta * Entropy ] """ old_q = float(self.q_table[state][int(action)]) future_qs = self.q_table[new_state] max_future_q = float(np.max(future_qs)) # Entropy regularisation (soft-Q) probs = np.exp(future_qs - max_future_q) probs /= np.sum(probs) entropy = -np.sum(probs * np.log(probs + 1e-9)) target = float(reward) + self.gamma * (max_future_q + self.entropy_beta * entropy) new_q = old_q + self.alpha * (target - old_q) self.q_table[state][int(action)] = new_q self.total_reward += float(reward) return new_q
[docs] class FusionAI_Agent(FusionAIAgent): """Backward-compatible alias for older scripts."""
def _plot_learning( h_turb: np.ndarray, h_flow: np.ndarray, h_temp: np.ndarray, h_shear_ctrl: np.ndarray, q_table: np.ndarray, output_path: str, ) -> tuple[bool, Optional[str]]: try: fig = plt.figure(figsize=(14, 8)) ax1 = fig.add_subplot(2, 3, 1) lookback = min(1000, int(h_turb.size)) if lookback > 0: ax1.plot(h_turb[-lookback:], h_flow[-lookback:], "k-", alpha=0.3) lookback_recent = min(100, int(h_turb.size)) if lookback_recent > 0: ax1.plot( h_turb[-lookback_recent:], h_flow[-lookback_recent:], "r-", linewidth=2, label="Last 100 steps", ) ax1.set_title("Phase Space: L-H Transition") ax1.set_xlabel("Turbulence (Avalanche)") ax1.set_ylabel("Zonal Flow (Shear)") ax1.legend() ax2 = fig.add_subplot(2, 3, (2, 3)) ax2.plot(h_temp, color="orange", label="Core Temperature") ax2_twin = ax2.twinx() ax2_twin.plot(h_flow, color="blue", alpha=0.3, label="Internal Flow") ax2.set_title("Reactor Evolution: Temperature Growth") ax2.legend(loc="upper left") ax2_twin.legend(loc="upper right") ax3 = fig.add_subplot(2, 3, (4, 5)) ax3.plot(h_turb, "r-", alpha=0.3, label="Turbulence") ax3_twin = ax3.twinx() ax3_twin.plot(h_shear_ctrl, "g-", linewidth=2, label="AI Control Signal") ax3.set_title("AI Agent Response to Instability") ax3.legend(loc="upper left") ax3_twin.legend(loc="upper right") ax4 = fig.add_subplot(2, 3, 6) policy_map = np.argmax(q_table, axis=2) im = ax4.imshow(policy_map, origin="lower", cmap="viridis") ax4.set_title("Learned Policy (Q-Table)") ax4.set_xlabel("Flow State") ax4.set_ylabel("Turbulence State") plt.colorbar(im, ax=ax4, label="Action (0=Down, 2=Up)") plt.tight_layout() plt.savefig(output_path) plt.close(fig) return True, None except Exception as exc: return False, str(exc)
[docs] def run_advanced_learning_sim( size: int = L, time_steps: int = TIME_STEPS, seed: int = 42, *, epsilon: float = EPSILON, noise_probability: float = 0.01, shear_step: float = 0.05, shear_bounds: Tuple[float, float] = (0.0, 1.0), save_plot: bool = True, output_path: str = "Advanced_SOC_Learning.png", verbose: bool = True, ) -> Dict[str, Any]: """ Run deterministic SOC+Q-learning control simulation and return summary metrics. """ steps = int(time_steps) if steps < 1: raise ValueError("time_steps must be >= 1.") lo_shear, hi_shear = _normalize_bounds(shear_bounds, "shear_bounds") shear_step = float(shear_step) if not np.isfinite(shear_step) or shear_step < 0.0: raise ValueError("shear_step must be finite and >= 0.") noise_probability = float(noise_probability) if not np.isfinite(noise_probability) or noise_probability < 0.0 or noise_probability > 1.0: raise ValueError("noise_probability must be finite and in [0, 1].") rng = np.random.default_rng(int(seed)) if verbose: logger.info("--- SCPN MASTERPIECE: Predator-Prey Physics + Q-Learning Control ---") reactor = CoupledSandpileReactor(size=int(size)) brain = FusionAIAgent(epsilon=float(epsilon)) h_turb: list[float] = [] h_flow: list[float] = [] h_temp: list[float] = [] h_shear_ctrl: list[float] = [] h_shear_total: list[float] = [] current_ext_shear = 0.0 cadence = max(steps // 10, 1) for t in range(steps): if t == 0: turb_prev = 0.0 flow_prev = 0.0 else: turb_prev = h_turb[-1] flow_prev = h_flow[-1] state = brain.discretize_state(turb_prev, flow_prev) action_idx = brain.choose_action(state, rng) if action_idx == 0: current_ext_shear -= shear_step elif action_idx == 2: current_ext_shear += shear_step current_ext_shear = float(np.clip(current_ext_shear, lo_shear, hi_shear)) reactor.drive() if float(rng.random()) < noise_probability: reactor.drive() av_size, flow_val, total_shear = reactor.step_physics(current_ext_shear) core_temp = reactor.get_profile_energy() reward = (core_temp * 0.1) - (float(av_size) * 0.5) - (current_ext_shear * 2.0) next_state = brain.discretize_state(float(av_size), flow_val) brain.learn(state, action_idx, next_state, reward) h_turb.append(float(av_size)) h_flow.append(float(flow_val)) h_temp.append(float(core_temp)) h_shear_ctrl.append(float(current_ext_shear)) h_shear_total.append(float(total_shear)) if verbose and (t % cadence == 0 or t == steps - 1): logger.info( "Step %d: Temp=%.2f | Flow=%.3f | Turb=%s | AI_Shear=%.3f | Q-Avg=%.4f", t, core_temp, flow_val, av_size, current_ext_shear, float(np.mean(brain.q_table)), ) turb_arr = np.asarray(h_turb, dtype=np.float64) flow_arr = np.asarray(h_flow, dtype=np.float64) temp_arr = np.asarray(h_temp, dtype=np.float64) shear_arr = np.asarray(h_shear_ctrl, dtype=np.float64) total_shear_arr = np.asarray(h_shear_total, dtype=np.float64) plot_saved = False plot_error: Optional[str] = None if save_plot: plot_saved, plot_error = _plot_learning( turb_arr, flow_arr, temp_arr, shear_arr, brain.q_table, output_path, ) if verbose and plot_saved: logger.info("Simulation complete. Analysis saved: %s", output_path) return { "seed": int(seed), "steps": int(steps), "final_core_temp": float(temp_arr[-1]) if temp_arr.size else 0.0, "final_flow": float(flow_arr[-1]) if flow_arr.size else 0.0, "final_external_shear": float(shear_arr[-1]) if shear_arr.size else 0.0, "mean_turbulence": float(np.mean(turb_arr)) if turb_arr.size else 0.0, "mean_flow": float(np.mean(flow_arr)) if flow_arr.size else 0.0, "mean_core_temp": float(np.mean(temp_arr)) if temp_arr.size else 0.0, "max_external_shear": float(np.max(shear_arr)) if shear_arr.size else 0.0, "mean_total_shear": float(np.mean(total_shear_arr)) if total_shear_arr.size else 0.0, "total_reward": float(brain.total_reward), "q_table_mean": float(np.mean(brain.q_table)), "q_table_max_abs": float(np.max(np.abs(brain.q_table))), "plot_saved": bool(plot_saved), "plot_error": plot_error, }
if __name__ == "__main__": run_advanced_learning_sim()