Source code for scpn_fusion.core.gyro_swin_surrogate

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — GyroSwin-Like Turbulence Surrogate (GAI-01)
"""Deterministic GyroSwin-like surrogate for core-turbulence benchmarking.

This module intentionally uses synthetic data and an offline, zero-dependency
training path (NumPy only). It is scoped for CI and reproducibility while
providing a measurable speed/accuracy comparison against a slower
"GENE-like proxy" baseline.
"""

from __future__ import annotations

from dataclasses import dataclass
import time

import numpy as np


[docs] @dataclass(frozen=True) class TurbulenceDataset: """Synthetic core-turbulence dataset bundle.""" features: np.ndarray chi_i: np.ndarray
[docs] @dataclass(frozen=True) class SpeedBenchmark: """Per-sample timing comparison against a slower proxy solver.""" gene_proxy_s_per_sample: float surrogate_s_per_sample: float speedup: float
def _as_2d(features: np.ndarray) -> np.ndarray: x = np.asarray(features, dtype=np.float64) if x.ndim == 1: x = x.reshape(1, -1) if x.shape[1] != 10: raise ValueError("Expected feature matrix with shape (N, 10)") return x
[docs] def synthetic_core_turbulence_target(features: np.ndarray) -> np.ndarray: """Reference synthetic turbulence law used as the CI benchmark target.""" x = _as_2d(features) rho, q, s_hat, beta_e, grad_ti, grad_te, coll, kappa, delta, shear = x.T drive_i = np.maximum(0.0, grad_ti - 4.0) drive_e = np.maximum(0.0, grad_te - 4.7) shape_factor = 1.0 + 0.28 * (kappa - 1.3) + 0.12 * delta shear_suppression = np.exp(-1.6 * np.abs(shear)) mode_mix = 0.15 * np.sin(np.pi * rho * q * 0.35) ** 2 + 0.10 * np.tanh(22.0 * beta_e * s_hat) chi_i = ( (0.25 + 0.13 * drive_i**1.18 + 0.10 * drive_e**1.10 + 0.05 * coll + mode_mix) * shape_factor * shear_suppression ) return np.maximum(chi_i, 1e-6)
[docs] def generate_synthetic_gyrokinetic_dataset(seed: int, samples: int) -> TurbulenceDataset: """Generate deterministic synthetic "JET/ITPA-like" core turbulence samples.""" if samples < 8: raise ValueError("samples must be >= 8") rng = np.random.default_rng(seed) rho = rng.uniform(0.05, 0.95, samples) q = rng.uniform(1.0, 3.5, samples) s_hat = rng.uniform(0.2, 2.2, samples) beta_e = rng.uniform(0.003, 0.045, samples) grad_ti = rng.uniform(3.0, 11.0, samples) grad_te = rng.uniform(3.0, 10.0, samples) coll = rng.uniform(0.05, 1.5, samples) kappa = rng.uniform(1.2, 2.0, samples) delta = rng.uniform(0.08, 0.5, samples) shear = rng.uniform(-0.45, 0.45, samples) features = np.column_stack([rho, q, s_hat, beta_e, grad_ti, grad_te, coll, kappa, delta, shear]) chi_i = synthetic_core_turbulence_target(features) return TurbulenceDataset(features=features, chi_i=chi_i)
[docs] class GyroSwinLikeSurrogate: """Lightweight random-feature surrogate emulating a transformer-style map.""" def __init__(self, hidden_dim: int = 64, ridge: float = 5e-4, seed: int = 42) -> None: if hidden_dim < 8: raise ValueError("hidden_dim must be >= 8") self.hidden_dim = int(hidden_dim) self.ridge = float(max(ridge, 1e-10)) rng = np.random.default_rng(seed) self._omega = rng.normal(0.0, 0.65, size=(10, self.hidden_dim)) self._phase = rng.uniform(-np.pi, np.pi, size=self.hidden_dim) self._weights: np.ndarray | None = None def _feature_map(self, features: np.ndarray) -> np.ndarray: x = _as_2d(features) rho, q, s_hat, beta_e, grad_ti, grad_te, coll, kappa, delta, shear = x.T z = x @ self._omega + self._phase rff = np.concatenate([np.sin(z), np.cos(z)], axis=1) physics_terms = np.column_stack( [ np.maximum(0.0, grad_ti - 4.0), np.maximum(0.0, grad_te - 4.7), beta_e * s_hat * 22.0, rho * q, (kappa - 1.0), delta, shear * shear, np.exp(-np.abs(shear)), ] ) return np.column_stack([np.ones(x.shape[0]), x, physics_terms, rff])
[docs] def fit(self, features: np.ndarray, targets: np.ndarray) -> None: phi = self._feature_map(features) y = np.asarray(targets, dtype=np.float64).reshape(-1) if phi.shape[0] != y.shape[0]: raise ValueError("feature/target rows must match") lhs = phi.T @ phi + self.ridge * np.eye(phi.shape[1], dtype=np.float64) rhs = phi.T @ y self._weights = np.linalg.solve(lhs, rhs)
[docs] def predict(self, features: np.ndarray) -> np.ndarray: if self._weights is None: raise RuntimeError("Surrogate is not fit. Call fit() first.") phi = self._feature_map(features) out = phi @ self._weights return np.maximum(out, 1e-6)
[docs] def gene_proxy_predict(features: np.ndarray, iterations: int = 800) -> np.ndarray: """Slow "GENE-like" reference proxy for speed benchmarking. The proxy intentionally performs per-sample iterative updates to emulate heavier solver cost, while remaining deterministic and bounded for CI. """ x = _as_2d(features) base = synthetic_core_turbulence_target(x) out = np.empty(x.shape[0], dtype=np.float64) for i, row in enumerate(x): state = row.copy() accum = 0.0 for _ in range(iterations): rolled = np.roll(state, 1) state = np.tanh(0.84 * state + 0.14 * rolled + 0.02 * state * state) accum += ( 0.22 * state[4] * state[4] + 0.18 * state[5] * state[5] + 0.06 * abs(state[3]) + 0.04 * state[2] * state[2] ) correction = 1.0 + 0.015 * np.tanh(accum / max(iterations, 1) - 0.5) out[i] = max(base[i] * correction, 1e-6) return out
[docs] def rmse_percent(y_true: np.ndarray, y_pred: np.ndarray) -> float: y_t = np.asarray(y_true, dtype=np.float64).reshape(-1) y_p = np.asarray(y_pred, dtype=np.float64).reshape(-1) if y_t.size == 0 or y_t.shape != y_p.shape: raise ValueError("y_true/y_pred must be non-empty and same shape") rmse = float(np.sqrt(np.mean((y_t - y_p) ** 2))) scale = float(max(np.mean(np.abs(y_t)), 1e-9)) return 100.0 * rmse / scale
[docs] def benchmark_speedup( features: np.ndarray, surrogate: GyroSwinLikeSurrogate, min_baseline_s: float = 0.15, min_surrogate_s: float = 0.02, ) -> SpeedBenchmark: """Measure per-sample speedup of surrogate vs slow baseline proxy.""" x = _as_2d(features) baseline_loops = 1 while True: t0 = time.perf_counter() for _ in range(baseline_loops): _ = gene_proxy_predict(x) baseline_total = time.perf_counter() - t0 if baseline_total >= min_baseline_s or baseline_loops >= 32: break baseline_loops *= 2 surrogate_loops = 1 while True: t0 = time.perf_counter() for _ in range(surrogate_loops): _ = surrogate.predict(x) surrogate_total = time.perf_counter() - t0 if surrogate_total >= min_surrogate_s or surrogate_loops >= 4096: break surrogate_loops *= 2 n_samples = x.shape[0] gene_s_per = baseline_total / (baseline_loops * n_samples) surrogate_s_per = surrogate_total / (surrogate_loops * n_samples) speedup = gene_s_per / max(surrogate_s_per, 1e-12) return SpeedBenchmark( gene_proxy_s_per_sample=gene_s_per, surrogate_s_per_sample=surrogate_s_per, speedup=speedup, )