# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Tokamak Flight Sim
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple
import numpy as np
logger = logging.getLogger(__name__)
from scpn_fusion.core.fusion_kernel import FusionKernel
SHOT_DURATION = 50
DEFAULT_TARGET_R = 6.2
DEFAULT_TARGET_Z = 0.0
TARGET_ELONGATION = 1.7
[docs]
class FirstOrderActuator:
"""Discrete first-order actuator with rate limits, noise, and delay.
Models a realistic coil power supply for tokamak control:
- First-order lag: u_applied(s) = 1/(tau*s+1) * u_cmd
- Coil current rate limit: abs(du/dt) <= rate_limit [A/s]
- Sensor noise: additive Gaussian on measurement
- Measurement delay: pure transport delay on feedback signal
Parameters
----------
tau_s : float
Actuator time constant [s].
dt_s : float
Simulation timestep [s].
u_min, u_max : float
Saturation limits.
rate_limit : float
Maximum current rate of change [A/s]. Default 1e6 (1 MA/s, ITER PF spec).
sensor_noise_std : float
Standard deviation of additive sensor noise. Default 0.0 (disabled).
delay_steps : int
Number of timesteps of measurement delay. Default 0.
rng_seed : int or None
Random seed for reproducible noise (None = random).
"""
def __init__(
self,
*,
tau_s: float,
dt_s: float,
u_min: float = -1.0e9,
u_max: float = 1.0e9,
rate_limit: float = 1.0e6,
sensor_noise_std: float = 0.0,
delay_steps: int = 0,
rng_seed: Optional[int] = None,
) -> None:
tau_s = float(tau_s)
dt_s = float(dt_s)
if not np.isfinite(tau_s) or tau_s <= 0.0:
raise ValueError("tau_s must be finite and > 0.")
if not np.isfinite(dt_s) or dt_s <= 0.0:
raise ValueError("dt_s must be finite and > 0.")
self.tau_s = tau_s
self.dt_s = dt_s
self.u_min = float(u_min)
self.u_max = float(u_max)
self.rate_limit = float(rate_limit)
self.sensor_noise_std = float(sensor_noise_std)
self.delay_steps = int(delay_steps)
self._rng = np.random.default_rng(rng_seed)
self.state = 0.0
self._delay_buffer: list[float] = [0.0] * max(self.delay_steps, 1)
[docs]
def step(self, command: float) -> float:
"""Apply command through actuator dynamics with rate limiting."""
u_cmd = float(np.clip(command, self.u_min, self.u_max))
alpha = self.dt_s / (self.tau_s + self.dt_s)
u_new = self.state + alpha * (u_cmd - self.state)
# Rate limiting (coil current slew rate)
du = u_new - self.state
max_du = self.rate_limit * self.dt_s
if abs(du) > max_du:
du = np.sign(du) * max_du
u_new = self.state + du
self.state = float(np.clip(u_new, self.u_min, self.u_max))
# Update delay buffer
self._delay_buffer.append(self.state)
return self.state
[docs]
def get_measurement(self) -> float:
"""Return delayed, noisy measurement of actuator output."""
idx = max(0, len(self._delay_buffer) - 1 - self.delay_steps)
delayed = self._delay_buffer[idx]
if self.sensor_noise_std > 0:
noise = float(self._rng.normal(0.0, self.sensor_noise_std))
return delayed + noise
return delayed
[docs]
class IsoFluxController:
"""
Simulates the Plasma Control System (PCS).
Uses PID loops to adjust Coil Currents to maintain plasma shape.
"""
def __init__(
self,
config_file: str,
kernel_factory: Callable[[str], Any] = FusionKernel,
verbose: bool = True,
actuator_tau_s: float = 0.06,
heating_actuator_tau_s: Optional[float] = None,
actuator_current_delta_limit: float = 1.0e9,
heating_beta_max: float = 5.0,
control_dt_s: float = 0.05,
) -> None:
self.kernel = kernel_factory(config_file)
self.verbose = bool(verbose)
self.history = {
"t": [],
"Ip": [],
"R_axis": [],
"Z_axis": [],
"X_point": [],
"ctrl_R_cmd": [],
"ctrl_R_applied": [],
"ctrl_Z_cmd": [],
"ctrl_Z_applied": [],
"beta_cmd": [],
"beta_applied": [],
}
control_dt_s = float(control_dt_s)
if not np.isfinite(control_dt_s) or control_dt_s <= 0.0:
raise ValueError("control_dt_s must be finite and > 0.")
self.control_dt_s = control_dt_s
actuator_current_delta_limit = float(actuator_current_delta_limit)
if not np.isfinite(actuator_current_delta_limit) or actuator_current_delta_limit <= 0.0:
raise ValueError("actuator_current_delta_limit must be finite and > 0.")
heating_beta_max = float(heating_beta_max)
if not np.isfinite(heating_beta_max) or heating_beta_max <= 1.0:
raise ValueError("heating_beta_max must be finite and > 1.0.")
if heating_actuator_tau_s is None:
heating_actuator_tau_s = float(actuator_tau_s)
heating_actuator_tau_s = float(heating_actuator_tau_s)
if not np.isfinite(heating_actuator_tau_s) or heating_actuator_tau_s <= 0.0:
raise ValueError("heating_actuator_tau_s must be finite and > 0.")
# PID Gains for Position Control
# Radial Control (Horizontal) -> Controlled by Outer Coils (PF2, PF3, PF4)
self.pid_R = {"Kp": 2.0, "Ki": 0.1, "Kd": 0.5, "err_sum": 0, "last_err": 0}
# Vertical Control (Z-pos) -> Controlled by Top/Bottom diff (PF1 vs PF5)
self.pid_Z = {"Kp": 5.0, "Ki": 0.2, "Kd": 2.0, "err_sum": 0, "last_err": 0}
self._act_radial = FirstOrderActuator(
tau_s=actuator_tau_s,
dt_s=self.control_dt_s,
u_min=-actuator_current_delta_limit,
u_max=actuator_current_delta_limit,
)
self._act_top = FirstOrderActuator(
tau_s=actuator_tau_s,
dt_s=self.control_dt_s,
u_min=-actuator_current_delta_limit,
u_max=actuator_current_delta_limit,
)
self._act_bottom = FirstOrderActuator(
tau_s=actuator_tau_s,
dt_s=self.control_dt_s,
u_min=-actuator_current_delta_limit,
u_max=actuator_current_delta_limit,
)
self._act_heating = FirstOrderActuator(
tau_s=heating_actuator_tau_s,
dt_s=self.control_dt_s,
u_min=1.0,
u_max=heating_beta_max,
)
target = self.kernel.cfg.get("target", {})
self.target_R = float(target.get("R_axis", DEFAULT_TARGET_R))
self.target_Z = float(target.get("Z_axis", DEFAULT_TARGET_Z))
def _log(self, message: str) -> None:
if self.verbose:
logger.info(message)
[docs]
def pid_step(self, pid: Dict[str, float], error: float) -> float:
pid["err_sum"] += error
d_err = error - pid["last_err"]
pid["last_err"] = error
return (pid["Kp"] * error) + (pid["Ki"] * pid["err_sum"]) + (pid["Kd"] * d_err)
def _add_coil_current(self, coil_idx: int, delta: float) -> None:
coils = self.kernel.cfg.get("coils", [])
if 0 <= coil_idx < len(coils):
current = float(coils[coil_idx].get("current", 0.0))
coils[coil_idx]["current"] = current + float(delta)
[docs]
def run_shot(
self,
shot_duration: int = 30,
save_plot: bool = True,
output_path: str = "Tokamak_Flight_Report.png",
) -> Dict[str, Any]:
"""
Run a simulated tokamak shot.
Parameters
----------
shot_duration : int
Number of simulation steps. Default 30.
save_plot : bool
Whether to generate a summary plot.
output_path : str
Filename for the plot.
"""
steps = int(shot_duration)
if steps < 1:
raise ValueError("shot_duration must be >= 1.")
self._log(f"--- INITIATING TOKAMAK FLIGHT SIMULATOR ({steps} steps) ---")
self._log(f"Scenario: Current Ramp-Up & Divertor Formation (dt={self.control_dt_s}s)")
# Initial Solve
self.kernel.solve_equilibrium()
Ip_cfg = float(self.kernel.cfg["physics"]["plasma_current_target"])
# Physics Evolution Loop
for t in range(steps):
time_s = t * self.control_dt_s
target_Ip = Ip_cfg * (0.98 + 0.02 * t / steps)
physics_cfg = self.kernel.cfg.setdefault("physics", {})
physics_cfg["plasma_current_target"] = target_Ip
# Heating ramp — drives outward Shafranov shift
beta_cmd = 1.0 + (0.002 * t)
beta_applied = self._act_heating.step(beta_cmd)
physics_cfg["beta_scale"] = beta_applied
# Find current magnetic axis (parabolic sub-grid interpolation)
idx_max = np.argmax(self.kernel.Psi)
iz, ir = np.unravel_index(idx_max, self.kernel.Psi.shape)
curr_R = float(self.kernel.R[ir])
curr_Z = float(self.kernel.Z[iz])
Psi = self.kernel.Psi
if 1 <= ir <= self.kernel.NR - 2:
a, b, c = Psi[iz, ir - 1], Psi[iz, ir], Psi[iz, ir + 1]
denom = 2.0 * (a - 2.0 * b + c)
if abs(denom) > 1e-30:
dr = np.clip(-(c - a) / denom, -0.5, 0.5)
curr_R += dr * self.kernel.dR
if 1 <= iz <= self.kernel.NZ - 2:
a, b, c = Psi[iz - 1, ir], Psi[iz, ir], Psi[iz + 1, ir]
denom = 2.0 * (a - 2.0 * b + c)
if abs(denom) > 1e-30:
dz = np.clip(-(c - a) / denom, -0.5, 0.5)
curr_Z += dz * self.kernel.dZ
xp_pos, _ = self.kernel.find_x_point(self.kernel.Psi)
err_R = self.target_R - curr_R
err_Z = self.target_Z - curr_Z
ctrl_radial_cmd = self.pid_step(self.pid_R, err_R)
ctrl_vertical_cmd = self.pid_step(self.pid_Z, err_Z)
# First-order actuator transfer layer (power-supply lag / inductance).
ctrl_radial = self._act_radial.step(ctrl_radial_cmd)
ctrl_vertical_top = self._act_top.step(-ctrl_vertical_cmd)
ctrl_vertical_bottom = self._act_bottom.step(ctrl_vertical_cmd)
ctrl_vertical_applied = 0.5 * (ctrl_vertical_bottom - ctrl_vertical_top)
self._add_coil_current(2, ctrl_radial) # PF3 radial correction
self._add_coil_current(0, ctrl_vertical_top) # top coil
self._add_coil_current(4, ctrl_vertical_bottom) # bottom coil
self.kernel.solve_equilibrium()
self.history["t"].append(t)
self.history["Ip"].append(target_Ip)
self.history["R_axis"].append(curr_R)
self.history["Z_axis"].append(curr_Z)
self.history["X_point"].append(xp_pos)
self.history["ctrl_R_cmd"].append(ctrl_radial_cmd)
self.history["ctrl_R_applied"].append(ctrl_radial)
self.history["ctrl_Z_cmd"].append(ctrl_vertical_cmd)
self.history["ctrl_Z_applied"].append(ctrl_vertical_applied)
self.history["beta_cmd"].append(beta_cmd)
self.history["beta_applied"].append(beta_applied)
self._log(
f"Time {time_s:.2f}s (Step {t}): Ip={target_Ip:.1f}MA | "
f"Axis=({curr_R:.2f}, {curr_Z:.2f}) | XP=({xp_pos[0]:.2f}, {xp_pos[1]:.2f}) | Ctrl_R={ctrl_radial:.2f} | Psi_max={np.max(self.kernel.Psi):.2f}"
)
plot_saved = False
plot_error = None
if save_plot:
plot_saved, plot_error = self.visualize_flight(output_path=output_path)
final_axis_r = float(self.history["R_axis"][-1]) if self.history["R_axis"] else 0.0
final_axis_z = float(self.history["Z_axis"][-1]) if self.history["Z_axis"] else 0.0
final_ip_ma = float(self.history["Ip"][-1]) if self.history["Ip"] else 0.0
mean_abs_r_error = (
float(np.mean(np.abs(np.asarray(self.history["R_axis"]) - self.target_R)))
if self.history["R_axis"]
else 0.0
)
mean_abs_z_error = (
float(np.mean(np.abs(np.asarray(self.history["Z_axis"]) - self.target_Z)))
if self.history["Z_axis"]
else 0.0
)
mean_abs_radial_actuator_lag = (
float(
np.mean(
np.abs(
np.asarray(self.history["ctrl_R_cmd"], dtype=np.float64)
- np.asarray(self.history["ctrl_R_applied"], dtype=np.float64)
)
)
)
if self.history["ctrl_R_cmd"]
else 0.0
)
mean_abs_vertical_actuator_lag = (
float(
np.mean(
np.abs(
np.asarray(self.history["ctrl_Z_cmd"], dtype=np.float64)
- np.asarray(self.history["ctrl_Z_applied"], dtype=np.float64)
)
)
)
if self.history["ctrl_Z_cmd"]
else 0.0
)
mean_abs_heating_actuator_lag = (
float(
np.mean(
np.abs(
np.asarray(self.history["beta_cmd"], dtype=np.float64)
- np.asarray(self.history["beta_applied"], dtype=np.float64)
)
)
)
if self.history["beta_cmd"]
else 0.0
)
final_beta_scale = (
float(self.history["beta_applied"][-1]) if self.history["beta_applied"] else 1.0
)
return {
"steps": int(steps),
"final_ip_ma": final_ip_ma,
"final_axis_r": final_axis_r,
"final_axis_z": final_axis_z,
"final_beta_scale": final_beta_scale,
"mean_abs_r_error": mean_abs_r_error,
"mean_abs_z_error": mean_abs_z_error,
"mean_abs_radial_actuator_lag": mean_abs_radial_actuator_lag,
"mean_abs_vertical_actuator_lag": mean_abs_vertical_actuator_lag,
"mean_abs_heating_actuator_lag": mean_abs_heating_actuator_lag,
"plot_saved": bool(plot_saved),
"plot_error": plot_error,
}
[docs]
def visualize_flight(
self,
output_path: str = "Tokamak_Flight_Report.png",
) -> Tuple[bool, Optional[str]]:
try:
import matplotlib.pyplot as plt
except Exception as exc:
return False, f"matplotlib unavailable: {exc}"
try:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
ax1.set_title("Plasma Trajectory Control")
ax1.plot(self.history["t"], self.history["R_axis"], "b-", label="R Axis (Radial)")
ax1.plot(self.history["t"], self.history["Z_axis"], "r-", label="Z Axis (Vertical)")
ax1.axhline(self.target_R, color="b", linestyle="--", alpha=0.5, label="Target R")
ax1.axhline(self.target_Z, color="r", linestyle="--", alpha=0.5, label="Target Z")
ax1.set_xlabel("Shot Time (a.u.)")
ax1.set_ylabel("Position (m)")
ax1.legend()
ax1.grid(True)
rx = [p[0] for p in self.history["X_point"]]
rz = [p[1] for p in self.history["X_point"]]
# Filter out 0,0 (Limiter phase)
valid_idx = [i for i, x in enumerate(rx) if x > 1.0]
if valid_idx:
ax2.plot(
[rx[i] for i in valid_idx], [rz[i] for i in valid_idx], "g-o", markersize=4
)
ax2.set_title("Divertor X-Point Movement")
ax2.set_xlabel("R (m)")
ax2.set_ylabel("Z (m)")
ax2.grid(True)
# Draw final shape if available from kernel implementation.
if hasattr(self.kernel, "RR") and hasattr(self.kernel, "ZZ"):
ax2.contour(
self.kernel.RR,
self.kernel.ZZ,
self.kernel.Psi,
levels=10,
colors="k",
alpha=0.2,
)
else:
ax2.text(0.5, 0.5, "Plasma Remained Limited (No Divertor)", ha="center")
plt.tight_layout()
plt.savefig(output_path)
plt.close(fig)
self._log(f"Flight Sim Complete. Report: {output_path}")
return True, None
except Exception as exc:
return False, str(exc)
[docs]
def run_flight_sim(
config_file: Optional[str] = None,
shot_duration: int = SHOT_DURATION,
seed: int = 42,
save_plot: bool = True,
output_path: str = "Tokamak_Flight_Report.png",
verbose: bool = True,
actuator_tau_s: float = 0.06,
heating_actuator_tau_s: Optional[float] = None,
actuator_current_delta_limit: float = 1.0e9,
heating_beta_max: float = 5.0,
control_dt_s: float = 0.05,
kernel_factory: Callable[[str], Any] = FusionKernel,
) -> Dict[str, Any]:
"""Run deterministic tokamak flight-sim control loop and return summary."""
seed_int = int(seed)
if config_file is None:
repo_root = Path(__file__).resolve().parents[3]
config_file = str(repo_root / "iter_config.json")
sim = IsoFluxController(
config_file=str(config_file),
kernel_factory=kernel_factory,
verbose=verbose,
actuator_tau_s=actuator_tau_s,
heating_actuator_tau_s=heating_actuator_tau_s,
actuator_current_delta_limit=actuator_current_delta_limit,
heating_beta_max=heating_beta_max,
control_dt_s=control_dt_s,
)
summary = sim.run_shot(
shot_duration=shot_duration,
save_plot=save_plot,
output_path=output_path,
)
summary["seed"] = seed_int
summary["config_path"] = str(config_file)
return summary
if __name__ == "__main__":
run_flight_sim()