Source code for scpn_fusion.core.turbulence_oracle

# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Turbulence Oracle
"""Turbulence oracle for reduced-order edge-chaos forecasting."""

from __future__ import annotations

import numpy as np
from numpy.typing import NDArray
import matplotlib.pyplot as plt
from scipy.sparse import rand

FloatArray = NDArray[np.float64]
ComplexArray = NDArray[np.complex128]

GRID = 64
L = 10.0
ALPHA = 0.1  # Adiabaticity parameter
KAPPA = 0.5  # Density gradient drive
NU = 0.01  # Viscosity
DT = 0.05
HYPERVISCOSITY_ORDER = 4


[docs] class DriftWavePhysics: """2D Hasegawa-Wakatani solver for plasma edge turbulence. Variables: phi (electrostatic potential), n (density fluctuation). Pass ``seed`` for reproducible stochastic initial conditions. """ def __init__(self, N: int = GRID, seed: int | None = None) -> None: self.N = N self.k = np.fft.fftfreq(N, d=L / (2 * np.pi * N)) self.kx, self.ky = np.meshgrid(self.k, self.k) self.k2 = self.kx**2 + self.ky**2 self.k2_safe = self.k2.copy() self.k2_safe[0, 0] = 1.0 # Avoid division by zero in phi inversion only. # De-aliasing mask (2/3 rule) # Filters out high-k modes that cause spectral blocking explosion k_max = np.max(np.abs(self.k)) self.mask = np.where(self.k2 < (2.0 / 3.0 * k_max) ** 2, 1.0, 0.0) # Init State (Random Noise) if seed is None: phi_noise = np.random.randn(N, N) * 0.01 n_noise = np.random.randn(N, N) * 0.01 else: rng = np.random.default_rng(seed) phi_noise = rng.normal(size=(N, N)) * 0.01 n_noise = rng.normal(size=(N, N)) * 0.01 self.phi_k = np.fft.fft2(phi_noise) * self.mask self.n_k = np.fft.fft2(n_noise) * self.mask
[docs] def bracket(self, A_k: ComplexArray, B_k: ComplexArray) -> ComplexArray: """Compute the Poisson bracket [A, B] with 2/3-rule de-aliasing.""" # Derivatives in spectral space dxA = np.fft.ifft2(1j * self.kx * A_k) dyA = np.fft.ifft2(1j * self.ky * A_k) dxB = np.fft.ifft2(1j * self.kx * B_k) dyB = np.fft.ifft2(1j * self.ky * B_k) # Nonlinear product in real space nonlin = dxA * dyB - dyA * dxB # Back to spectral + De-aliasing return np.asarray(np.fft.fft2(nonlin) * self.mask, dtype=np.complex128)
[docs] @staticmethod def spectral_dissipation_multiplier(k2: FloatArray) -> FloatArray: """Return the Hasegawa-Wakatani fourth-order hyperviscous multiplier.""" return np.asarray( NU * np.asarray(k2, dtype=np.float64) ** (HYPERVISCOSITY_ORDER // 2), dtype=np.float64 )
[docs] def step(self) -> tuple[FloatArray, FloatArray]: """Advance one RK4 time step and return the real-space (phi, n) fields.""" p = self.phi_k n = self.n_k # Reduced time step for stability local_dt = 0.01 def rhs(p_in: ComplexArray, n_in: ComplexArray) -> tuple[ComplexArray, ComplexArray]: # Enforce mask p_in *= self.mask n_in *= self.mask # Vorticity w = -k^2 phi w_in = -self.k2 * p_in # Non-linear terms brack_phi_w = self.bracket(p_in, w_in) brack_phi_n = self.bracket(p_in, n_in) # Linear terms (Hasegawa-Wakatani) # dw/dt = -[phi,w] + alpha*(phi-n) - nu*k^4*w # dn/dt = -[phi,n] + alpha*(phi-n) - kappa*dy_phi - nu*k^4*n coupling = ALPHA * (p_in - n_in) # Fourth-order hyperviscosity: nu * k^4. dissip = self.spectral_dissipation_multiplier(self.k2) dissip_w = dissip * w_in dissip_n = dissip * n_in dw_dt = -brack_phi_w + coupling - dissip_w # Invert to get d(phi)/dt: dphi = -dw / k^2 dp_dt = -dw_dt / self.k2_safe dp_dt[0, 0] = 0.0 # Zero mean dn_dt = -brack_phi_n + coupling - KAPPA * (1j * self.ky * p_in) - dissip_n return ( np.asarray(dp_dt, dtype=np.complex128), np.asarray(dn_dt, dtype=np.complex128), ) # RK4 Integration k1_p, k1_n = rhs(p, n) k2_p, k2_n = rhs(p + 0.5 * local_dt * k1_p, n + 0.5 * local_dt * k1_n) k3_p, k3_n = rhs(p + 0.5 * local_dt * k2_p, n + 0.5 * local_dt * k2_n) k4_p, k4_n = rhs(p + local_dt * k3_p, n + local_dt * k3_n) self.phi_k += (local_dt / 6.0) * (k1_p + 2 * k2_p + 2 * k3_p + k4_p) self.n_k += (local_dt / 6.0) * (k1_n + 2 * k2_n + 2 * k3_n + k4_n) # Stability Clamp (Prevent blow-up) max_amp = np.max(np.abs(self.phi_k)) if max_amp > 100.0: # Rescale to prevent overflow scale = 100.0 / max_amp self.phi_k *= scale self.n_k *= scale return np.real(np.fft.ifft2(self.phi_k)), np.real(np.fft.ifft2(self.n_k))
[docs] class OracleESN: """Echo State Network (reservoir computing) for chaotic time-series forecasting. Specialised for predicting chaotic time series. Pass ``seed`` for reproducible reservoir construction. """ def __init__( self, input_dim: int, reservoir_size: int = 500, spectral_radius: float = 0.95, seed: int | None = None, ) -> None: random_state: int | None if seed is None: self.W_in = np.random.uniform(-1, 1, (reservoir_size, input_dim)) random_state = None def reservoir_values(size: int) -> FloatArray: return np.asarray(np.random.uniform(-1, 1, size), dtype=np.float64) else: rng = np.random.default_rng(seed) self.W_in = rng.uniform(-1, 1, (reservoir_size, input_dim)) random_state = seed def reservoir_values(size: int) -> FloatArray: return np.asarray(rng.uniform(-1, 1, size), dtype=np.float64) # Sparse Reservoir self.W_res = rand( reservoir_size, reservoir_size, density=0.1, random_state=random_state, ).toarray() nonzero = self.W_res != 0.0 self.W_res[nonzero] = reservoir_values(np.count_nonzero(nonzero)) # Scale spectral radius eigenvalues = np.linalg.eigvals(self.W_res) radius = float(np.max(np.abs(eigenvalues))) if np.isfinite(radius) and radius > 1.0e-12: self.W_res *= spectral_radius / radius self.state = np.zeros(reservoir_size) self.W_out: FloatArray | None = None # Trained later
[docs] def train(self, inputs: FloatArray, targets: FloatArray) -> None: """Train the readout weights by ridge regression on harvested states.""" print(f"[Oracle] Training on {len(inputs)} chaotic states...") states = [] # Harvest states for u in inputs: self.state = np.tanh(np.dot(self.W_in, u) + np.dot(self.W_res, self.state)) states.append(self.state) S = np.array(states) # [Time, Reservoir] # Solve W_out * (S.T S + beta I) = Targets.T * S without forming inverse. reg = 1e-4 system = np.dot(S.T, S) + reg * np.eye(S.shape[1]) rhs = np.dot(targets.T, S) self.W_out = np.linalg.solve(system.T, rhs.T).T print("[Oracle] Mental Model Formed.")
[docs] def predict(self, u_current: FloatArray, steps: int = 50) -> FloatArray: """Generate closed-loop ESN predictions from an initial state.""" if self.W_out is None: raise RuntimeError("Oracle is not trained. Call train() first.") predictions = [] curr = u_current for _ in range(steps): # Update reservoir with current feedback (Closed Loop prediction) self.state = np.tanh(np.dot(self.W_in, curr) + np.dot(self.W_res, self.state)) # Readout pred = np.dot(self.W_out, self.state) predictions.append(pred) curr = pred # Feed prediction back return np.asarray(predictions, dtype=np.float64)
[docs] def run_turbulence_oracle() -> None: """Run an end-to-end Drift-Wave + ESN prediction and save diagnostic plot.""" print("--- SCPN TURBULENCE ORACLE: PREDICTING CHAOS ---") hw = DriftWavePhysics() for _ in range(100): hw.step() # warmup print("Generating Training Data (Hasegawa-Wakatani)...") data_phi = [] # We sample the field at 16 probe locations (Sparse Sensing) # Reconstructing full field is Tomography job, here we predict probes probe_idx = np.linspace(0, GRID * GRID - 1, 16, dtype=int) for _ in range(1000): phi, _ = hw.step() probes = phi.flatten()[probe_idx] data_phi.append(probes) data = np.array(data_phi) # Split Train/Test train_len = 800 X_train = data[:train_len] Y_train = data[1 : train_len + 1] # Next step target oracle = OracleESN(input_dim=16) oracle.train(X_train, Y_train) print("Testing Prediction Horizon...") start_state = data[train_len] horizon = 150 # Physics Future (Ground Truth) truth = data[train_len : train_len + horizon] # AI Future (Hallucination) prediction = oracle.predict(start_state, steps=horizon) mse = np.mean((truth - prediction) ** 2, axis=1) # Find "Trust Horizon" (where error exceeds threshold) threshold = 0.5 * np.var(truth) try: trust_steps = np.where(mse > threshold)[0][0] except IndexError: trust_steps = horizon print(f"Prediction Horizon: {trust_steps} steps") print(f"Physics Time: {trust_steps * DT:.2f} normalized units") # Plot fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8)) # Plot one probe trace ax1.plot(truth[:, 0], "k-", label="Reality (H-W Physics)") ax1.plot(prediction[:, 0], "r--", label="Oracle Prediction (ESN)") ax1.axvline(trust_steps, color="b", linestyle=":", label="Lyapunov Horizon") ax1.set_title("Turbulence Probe Signal Prediction") ax1.legend() # Plot Divergence ax2.plot(mse, "g-") ax2.axhline(threshold, color="k", linestyle="--") ax2.set_title("Forecast Error Divergence (Chaos)") ax2.set_xlabel("Steps into Future") ax2.set_ylabel("MSE") plt.tight_layout() plt.savefig("Turbulence_Oracle.png") print("Saved: Turbulence_Oracle.png")
if __name__ == "__main__": run_turbulence_oracle()