Source code for scpn_fusion.control.tokamak_digital_twin

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Tokamak Digital Twin
import logging
from typing import Optional, Sequence

import matplotlib.pyplot as plt
import numpy as np

logger = logging.getLogger(__name__)

from scpn_fusion.io.imas_connector import (
    digital_twin_history_to_ids,
    digital_twin_history_to_ids_pulse,
    digital_twin_summary_to_ids,
)

GRID_SIZE = 40
TIME_STEPS = 10000
LEARNING_RATE = 0.0001
HIDDEN_SIZE = 64
BATCH_SIZE = 32
MEMORY_SIZE = 1000
R_MAJ = 2.0  # major radius [m]
R_MIN = 0.8  # minor radius [m]
_GRID_SPACING = 1.0  # cell size for 2D diffusion stencil [arbitrary]


def _resolve_rng(seed: int, rng: Optional[np.random.Generator]) -> np.random.Generator:
    if rng is not None:
        if not isinstance(rng, np.random.Generator):
            raise TypeError("rng must be a numpy.random.Generator when provided")
        return rng
    return np.random.default_rng(int(seed))


[docs] class TokamakTopoloy: """Magnetic geometry: q-profile and island evolution via Modified Rutherford Equation.""" def __init__(self, size=GRID_SIZE): self.size = size y, x = np.ogrid[-size / 2 : size / 2, -size / 2 : size / 2] self.r_map = np.sqrt(x**2 + y**2) / (size / 2) self.mask = self.r_map <= 1.0 self.q0 = 1.0 self.qa = 3.0 self.update_q_profile(0.0) self.resonances = [1.5, 2.0, 2.5, 3.0] self.island_widths = {res: 0.01 for res in self.resonances} self.eta = 1e-5
[docs] def step_island_evolution(self, dt=0.1): """Evolve island widths via MRE with neoclassical bootstrap drive.""" beta_p = 0.6 w_crit = 0.05 for res in self.resonances: delta_prime = -0.2 - (5.0 * self.island_widths[res]) f_bs = beta_p * (self.island_widths[res] / (self.island_widths[res] ** 2 + w_crit**2)) dw_dt = self.eta * (delta_prime + f_bs) self.island_widths[res] = max(0.001, self.island_widths[res] + dw_dt * dt)
[docs] def update_q_profile(self, current_drive_action): """Update parabolic q(r) = q0 + (qa-q0)*r^2 with current drive modulation.""" mod_q0 = self.q0 - (0.2 * current_drive_action) mod_qa = self.qa + (0.5 * current_drive_action) self.q_map = mod_q0 + (mod_qa - mod_q0) * (self.r_map**2)
[docs] def get_rational_surfaces(self): """Boolean map of rational-surface islands from current MRE widths.""" danger_map = np.zeros((self.size, self.size), dtype=bool) for res in self.resonances: width = self.island_widths[res] mask = (np.abs(self.q_map - res) < width) & self.mask danger_map = np.logical_or(danger_map, mask) return danger_map
[docs] class Plasma2D: """2D diffusion-reaction model on a poloidal cross-section.""" def __init__(self, topology, gyro_surrogate=None): self.topo = topology self.T = np.zeros((GRID_SIZE, GRID_SIZE)) self.T_core_hist = [] self._gyro_surrogate = gyro_surrogate
[docs] def step(self, action): """Evolve plasma one timestep with current drive action in [-1, 1].""" self.topo.update_q_profile(action) self.topo.step_island_evolution() danger_zones = self.topo.get_rational_surfaces() center = GRID_SIZE // 2 self.T[center, center] += 5.0 D_base = 0.01 D_turb = 0.5 diffusivity = np.ones_like(self.T) * D_base diffusivity[danger_zones] = D_turb if self._gyro_surrogate is not None: correction = np.asarray( self._gyro_surrogate(self.T, self.topo.q_map, danger_zones), dtype=float, ) if correction.shape != self.T.shape: raise ValueError( f"gyro_surrogate shape must be {self.T.shape}, got {correction.shape}" ) correction = np.nan_to_num(correction, nan=1.0, posinf=2.0, neginf=0.5) np.clip(correction, 0.2, 5.0, out=correction) diffusivity *= correction # CFL stability: dt_cfl = dx^2 / (4 * D_max) for 2D explicit Euler D_max = float(np.max(diffusivity)) dt_cfl = _GRID_SPACING**2 / (4.0 * max(D_max, 1e-12)) dt_phys = 1.0 n_sub = max(1, int(np.ceil(dt_phys / dt_cfl))) dt_sub = dt_phys / n_sub for _ in range(n_sub): T_up = np.roll(self.T, -1, axis=0) T_down = np.roll(self.T, 1, axis=0) T_left = np.roll(self.T, -1, axis=1) T_right = np.roll(self.T, 1, axis=1) laplacian = T_up + T_down + T_left + T_right - 4 * self.T radiation = 0.002 * np.sqrt(np.maximum(self.T, 0.0) + 1e-6) tungsten_rad = 0.05 * np.exp(-((self.T - 2.0) ** 2) / 0.5) self.T += dt_sub * (diffusivity * laplacian - radiation - tungsten_rad) np.maximum(self.T, 0.0, out=self.T) self.T[~self.topo.mask] = 0.0 self.T = np.clip(self.T, 0, 100.0) core_temp = self.T[center, center] avg_temp = np.mean(self.T[self.topo.mask]) self.T_core_hist.append(core_temp) return self.T.flatten(), avg_temp
[docs] class SimpleNeuralNet: """NumPy MLP policy network for continuous control.""" def __init__( self, input_size: int, hidden_size: int, output_size: int, *, rng: np.random.Generator, ) -> None: self.W1 = rng.standard_normal((input_size, hidden_size)) * np.sqrt(1 / input_size) self.b1 = np.zeros((1, hidden_size)) self.W2 = rng.standard_normal((hidden_size, output_size)) * np.sqrt(1 / hidden_size) self.b2 = np.zeros((1, output_size))
[docs] def forward(self, x): self.z1 = np.dot(x, self.W1) + self.b1 self.a1 = np.tanh(self.z1) self.z2 = np.dot(self.a1, self.W2) + self.b2 self.out = np.tanh(self.z2) return self.out
[docs] def train_step(self, x, target_action, advantage): """REINFORCE-like policy gradient step.""" pred = self.forward(x) grad_out = -(advantage) d_z2 = grad_out * (1 - self.out**2) d_W2 = np.dot(self.a1.T, d_z2) d_b2 = np.sum(d_z2, axis=0, keepdims=True) d_a1 = np.dot(d_z2, self.W2.T) d_z1 = d_a1 * (1 - self.a1**2) d_W1 = np.dot(x.T, d_z1) d_b1 = np.sum(d_z1, axis=0, keepdims=True) self.W1 -= LEARNING_RATE * d_W1 self.b1 -= LEARNING_RATE * d_b1 self.W2 -= LEARNING_RATE * d_W2 self.b2 -= LEARNING_RATE * d_b2 return np.mean(np.abs(grad_out))
[docs] def run_digital_twin( time_steps=TIME_STEPS, seed=42, save_plot=True, output_path="Tokamak_Digital_Twin.png", verbose=True, gyro_surrogate=None, chaos_monkey: bool = False, sensor_dropout_prob: float = 0.0, sensor_noise_std: float = 0.0, rng: Optional[np.random.Generator] = None, ): """Run digital-twin control simulation, return summary dict.""" steps = int(time_steps) if steps < 1: raise ValueError("time_steps must be >= 1.") chaos_monkey = bool(chaos_monkey) sensor_dropout_prob = float(sensor_dropout_prob) sensor_noise_std = float(sensor_noise_std) if ( not np.isfinite(sensor_dropout_prob) or sensor_dropout_prob < 0.0 or sensor_dropout_prob > 1.0 ): raise ValueError("sensor_dropout_prob must be finite and in [0, 1].") if not np.isfinite(sensor_noise_std) or sensor_noise_std < 0.0: raise ValueError("sensor_noise_std must be finite and >= 0.") local_rng = _resolve_rng(seed=int(seed), rng=rng) if verbose: logger.info("--- SCPN 2D TOKAMAK DIGITAL TWIN + NEURAL CONTROL ---") topo = TokamakTopoloy() plasma = Plasma2D(topo, gyro_surrogate=gyro_surrogate) state_dim = GRID_SIZE brain = SimpleNeuralNet(state_dim, HIDDEN_SIZE, 1, rng=local_rng) history_rewards = [] history_actions = [] sensor_dropouts_total = 0 if verbose: logger.info("Training Neural Network for %d steps...", steps) for t in range(steps): midplane_idx = GRID_SIZE // 2 state_vector = np.asarray(plasma.T[midplane_idx, :], dtype=float).reshape(1, -1).copy() if chaos_monkey: if sensor_noise_std > 0.0: state_vector += local_rng.normal(0.0, sensor_noise_std, size=state_vector.shape) if sensor_dropout_prob > 0.0: dropout_mask = local_rng.random(state_vector.shape[1]) < sensor_dropout_prob dropped = int(np.sum(dropout_mask, dtype=np.int64)) if dropped > 0: state_vector[0, dropout_mask] = 0.0 sensor_dropouts_total += dropped state_vector = np.nan_to_num(state_vector, nan=0.0, posinf=100.0, neginf=0.0) noise = float(local_rng.normal(0.0, 0.2)) raw_action = brain.forward(state_vector) action = float(np.clip(raw_action + noise, -1.0, 1.0)[0, 0]) _, avg_temp = plasma.step(action) islands_area = np.sum(topo.get_rational_surfaces()) reward = avg_temp - (islands_area * 0.05) baseline = np.mean(history_rewards[-50:]) if len(history_rewards) > 50 else 0 # Derivative-free policy gradient estimator advantage = (reward - baseline) * noise loss = brain.train_step(state_vector, None, advantage) history_rewards.append(reward) history_actions.append(action) if verbose and t % 500 == 0: logger.info( "Digital twin simulation progress", extra={ "physics_context": { "step": t, "avg_temp": float(avg_temp), "action": float(action), "loss": float(loss), "islands_px": int(np.sum(topo.get_rational_surfaces())), } }, ) plot_saved = False plot_error = None if save_plot: try: fig = plt.figure(figsize=(15, 6)) ax1 = fig.add_subplot(1, 3, 1) im = ax1.imshow(plasma.T, cmap="inferno", origin="lower") ax1.set_title("Final Plasma Cross-Section (2D)") plt.colorbar(im, ax=ax1, label="Temperature (keV)") islands = topo.get_rational_surfaces() ax1.contour(islands, colors="cyan", levels=[0.5], linewidths=1, alpha=0.5) ax1.text(2, 2, "Cyan = q-Resonance (Islands)", color="cyan", fontsize=8) ax2 = fig.add_subplot(1, 3, 2) ax2.plot(history_rewards, color="orange", alpha=0.6) # Moving average if len(history_rewards) > 50: mov_avg = np.convolve(history_rewards, np.ones(50) / 50, mode="valid") ax2.plot(range(len(mov_avg)), mov_avg, "r-", linewidth=2, label="Moving Avg") ax2.set_title("Learning Curve") ax2.set_xlabel("Steps") ax2.set_ylabel("Reward (Confinement)") ax2.legend() ax3 = fig.add_subplot(1, 3, 3) r_axis = np.linspace(0, 1, GRID_SIZE // 2) q_axis = topo.q_map[GRID_SIZE // 2, GRID_SIZE // 2 :] ax3.plot(r_axis, q_axis, "b-", linewidth=2, label="Safety Factor q(r)") for q_res in [1.5, 2.0, 2.5, 3.0]: ax3.axhline(q_res, color="red", linestyle="--", alpha=0.3) ax3.text(0.1, q_res, f"q={q_res}", color="red", fontsize=8) ax3.set_title("Final Safety Factor Profile") ax3.set_xlabel("Normalized Radius r/a") ax3.set_ylabel("q") ax3.legend() plt.tight_layout() plt.savefig(output_path) plt.close(fig) plot_saved = True if verbose: logger.info("Digital Twin Simulation Complete. Snapshot saved: %s", output_path) except Exception as exc: plot_error = str(exc) if verbose: logger.warning("Digital Twin completed without plot artifact: %s", exc) islands_final = int(np.sum(topo.get_rational_surfaces())) final_avg_temp = float(history_rewards[-1] + islands_final * 0.05) if history_rewards else 0.0 summary = { "seed": int(seed), "steps": int(steps), "final_avg_temp": float(final_avg_temp), "final_reward": float(history_rewards[-1]) if history_rewards else 0.0, "final_action": float(history_actions[-1]) if history_actions else 0.0, "final_islands_px": islands_final, "reward_mean_last_50": float(np.mean(history_rewards[-50:])) if history_rewards else 0.0, "chaos_monkey": chaos_monkey, "sensor_dropout_prob": sensor_dropout_prob, "sensor_noise_std": sensor_noise_std, "sensor_dropouts_total": int(sensor_dropouts_total), "sensor_dropout_rate": float(sensor_dropouts_total / (steps * GRID_SIZE)), "plot_saved": bool(plot_saved), "plot_error": plot_error, } return summary
[docs] def run_digital_twin_ids( *, machine: str = "ITER", shot: int = 0, run: int = 0, **kwargs, ): """ Run digital twin and return IDS-like equilibrium payload. """ summary = run_digital_twin(**kwargs) return digital_twin_summary_to_ids( summary, machine=machine, shot=shot, run=run, )
[docs] def run_digital_twin_ids_history( history_steps: Sequence[int], *, machine: str = "ITER", shot: int = 0, run: int = 0, seed: int = 42, **kwargs, ): """ Run digital twin at multiple horizons and return IDS-like payload sequence. """ if "time_steps" in kwargs: raise ValueError("time_steps is controlled by history_steps in history mode.") if "seed" in kwargs: raise ValueError("seed is controlled by the seed argument in history mode.") if isinstance(history_steps, (str, bytes, bytearray)) or not isinstance( history_steps, Sequence ): raise ValueError("history_steps must be a sequence of positive integers.") if len(history_steps) == 0: raise ValueError("history_steps must contain at least one step count.") snapshots = _run_digital_twin_history_snapshots( history_steps=history_steps, seed=seed, **kwargs, ) return digital_twin_history_to_ids( snapshots, machine=machine, shot=shot, run=run, )
[docs] def run_digital_twin_ids_pulse( history_steps: Sequence[int], *, machine: str = "ITER", shot: int = 0, run: int = 0, seed: int = 42, **kwargs, ): """ Run digital twin at multiple horizons and return pulse-style IDS container. """ if "time_steps" in kwargs: raise ValueError("time_steps is controlled by history_steps in pulse mode.") if "seed" in kwargs: raise ValueError("seed is controlled by the seed argument in pulse mode.") snapshots = _run_digital_twin_history_snapshots( history_steps=history_steps, seed=seed, **kwargs, ) return digital_twin_history_to_ids_pulse( snapshots, machine=machine, shot=shot, run=run, )
def _run_digital_twin_history_snapshots( *, history_steps: Sequence[int], seed: int, **kwargs, ) -> list[dict[str, object]]: if isinstance(history_steps, (str, bytes, bytearray)) or not isinstance( history_steps, Sequence ): raise ValueError("history_steps must be a sequence of positive integers.") if len(history_steps) == 0: raise ValueError("history_steps must contain at least one step count.") snapshots: list[dict[str, object]] = [] base_seed = int(seed) for idx, step in enumerate(history_steps): if isinstance(step, bool) or not isinstance(step, int): raise ValueError(f"history_steps[{idx}] must be a positive integer.") if int(step) < 1: raise ValueError(f"history_steps[{idx}] must be >= 1.") summary = run_digital_twin( time_steps=int(step), seed=base_seed, **kwargs, ) snapshots.append(summary) return snapshots if __name__ == "__main__": run_digital_twin()