Skip to content

API Reference

Top-Level Exports

import scpn_control

scpn_control.__version__       # "0.19.0"
scpn_control.FusionKernel      # Grad-Shafranov equilibrium solver
scpn_control.RUST_BACKEND      # True if Rust acceleration available
scpn_control.TokamakConfig     # Preset tokamak geometries
scpn_control.StochasticPetriNet
scpn_control.FusionCompiler
scpn_control.CompiledNet
scpn_control.NeuroSymbolicController
scpn_control.kuramoto_sakaguchi_step
scpn_control.order_parameter
scpn_control.KnmSpec
scpn_control.build_knm_paper27
scpn_control.UPDESystem
scpn_control.LyapunovGuard
scpn_control.RealtimeMonitor

Core — Physics Solvers

FusionKernel

FusionKernel

FusionKernel(config_path)

Non-linear Grad-Shafranov equilibrium solver.

Parameters

config_path : str | Path Path to a JSON configuration file describing the reactor geometry, coil set, physics parameters and solver settings.

Attributes

Psi : FloatArray Poloidal flux on the (Z, R) grid. J_phi : FloatArray Toroidal current density on the (Z, R) grid. B_R, B_Z : FloatArray Radial and vertical magnetic field components (set after solve).

load_config

load_config(path)

Load reactor configuration from a JSON file.

Parameters

path : str | Path Filesystem path to the configuration JSON.

initialize_grid

initialize_grid()

Build the computational (R, Z) mesh from the loaded config.

setup_accelerator

setup_accelerator()

Initialise the optional C++ HPC acceleration bridge.

build_coilset_from_config

build_coilset_from_config()

Build a free-boundary CoilSet from the active JSON configuration.

solve

solve(*, boundary_variant=None, coils=None, preserve_initial_state=False, boundary_flux=None, max_outer_iter=20, tol=0.0001, optimize_shape=False, tikhonov_alpha=0.0001)

Solve using the requested boundary variant.

The default is taken from solver.boundary_variant in the active config.

solve_fixed_boundary

solve_fixed_boundary(preserve_initial_state=False, boundary_flux=None)

Explicit entry point for the fixed-boundary variant.

calculate_vacuum_field

calculate_vacuum_field()

Compute the vacuum poloidal flux from the external coil set.

Uses elliptic integrals (toroidal Green's function) for each coil.

Returns

FloatArray Vacuum flux Psi_vac on the (NZ, NR) grid.

find_x_point

find_x_point(Psi)

Locate the X-point (magnetic null) in the lower divertor region.

Parameters

Psi : FloatArray Poloidal flux array on the (NZ, NR) grid.

Returns

tuple[tuple[float, float], float] ((R_x, Z_x), Psi_x) — position and flux value at the X-point.

mtanh_profile staticmethod

mtanh_profile(psi_norm, params)

Evaluate a modified-tanh pedestal profile (vectorised).

Parameters

psi_norm : FloatArray Normalised poloidal flux (0 at axis, 1 at separatrix). params : dict[str, float] Profile shape parameters with keys ped_top, ped_width, ped_height, core_alpha.

Returns

FloatArray Profile value; zero outside the plasma region.

update_plasma_source_nonlinear

update_plasma_source_nonlinear(Psi_axis, Psi_boundary)

Compute the toroidal current density J_phi from the GS source.

Uses J_phi = R p'(psi) + FF'(psi) / (mu0 R) with either L-mode (linear) or H-mode (mtanh) profiles, then renormalises to match the target plasma current.

Parameters

Psi_axis : float Poloidal flux at the magnetic axis (O-point). Psi_boundary : float Poloidal flux at the separatrix (X-point or limiter).

Returns

FloatArray Updated J_phi on the (NZ, NR) grid.

solve_equilibrium

solve_equilibrium(preserve_initial_state=False, boundary_flux=None)

Run the full Picard-iteration equilibrium solver.

Iterates: topology analysis -> source update -> elliptic solve -> under-relaxation until the residual drops below the configured convergence threshold or the maximum iteration count is reached.

When solver.solver_method is "anderson", Anderson acceleration is applied every few Picard steps to speed up convergence.

Returns

dict[str, Any] {"psi": FloatArray, "converged": bool, "iterations": int, "residual": float, "residual_history": list[float], "gs_residual": float, "gs_residual_best": float, "gs_residual_history": list[float], "wall_time_s": float, "solver_method": str}

Parameters

preserve_initial_state : bool, optional Keep current interior self.Psi values and only enforce boundary conditions before the iterative solve. Default is False. boundary_flux : FloatArray | None, optional Explicit boundary map to enforce during solve. Must have shape (NZ, NR) when provided.

Raises

RuntimeError If the solver produces NaN or Inf and solver.fail_on_diverge is enabled in the active configuration.

compute_b_field

compute_b_field()

Derive the magnetic field components from the solved Psi.

optimize_coil_currents

optimize_coil_currents(coils, target_flux, tikhonov_alpha=0.0001, *, x_point_flux_target=None, divertor_flux_targets=None)

Find coil currents that best satisfy free-boundary target constraints.

Solves a bounded linear least-squares problem that can include:

  • boundary-flux targets at target_flux_points
  • X-point isoflux and null-field constraints
  • divertor strike-point isoflux constraints

    min_I || A I - b ||^2 + alpha * ||I||^2 s.t. -I_max <= I <= I_max (per coil)

where A stacks the active constraint blocks.

Parameters

coils : CoilSet Coil geometry and optional objective targets. target_flux : FloatArray, shape (n_pts,) Desired poloidal flux at target_flux_points. Can be empty when only X-point and/or divertor constraints are active. tikhonov_alpha : float Regularisation strength to penalise large currents. x_point_flux_target : float or None Scalar target flux at coils.x_point_target. divertor_flux_targets : FloatArray or None Flux targets at coils.divertor_strike_points.

Returns

FloatArray, shape (n_coils,) Optimised coil currents [A].

solve_free_boundary

solve_free_boundary(coils, max_outer_iter=20, tol=0.0001, optimize_shape=False, tikhonov_alpha=0.0001, objective_tolerances=None)

Experimental external-coil outer loop around the fixed-boundary GS solve.

Iterates between updating boundary flux from coils and solving the internal GS equation. When optimize_shape=True and the coil set has target_flux_points, an additional outer loop optimises the coil currents to match the desired plasma boundary shape.

This helper should not be interpreted as a complete production-grade free-boundary solver. The current project standard for closing the free-boundary roadmap item is higher: shape control, X-point geometry, and divertor-configuration support must all be demonstrated.

Parameters

coils : CoilSet External coil set. max_outer_iter : int Maximum outer-loop iterations for the experimental coil-coupled path. tol : float Convergence tolerance on max |delta psi|. optimize_shape : bool When True, run coil-current optimisation at each outer step. tikhonov_alpha : float Tikhonov regularisation for coil optimisation. objective_tolerances : dict or None Optional convergence gates for free-boundary target objectives. Supported keys are shape_rms, shape_max_abs, x_point_position, x_point_gradient, x_point_flux, divertor_rms, and divertor_max_abs. When omitted, the method falls back to free_boundary.objective_tolerances in the config, if present.

Returns

dict {"outer_iterations": int, "final_diff": float, "coil_currents": NDArray}

phase_sync_step

phase_sync_step(theta, omega, *, dt=0.001, K=None, alpha=None, zeta=None, psi_driver=None, psi_mode=None, actuation_gain=None)

Reduced-order plasma sync kernel (phase reduction).

dθ_i/dt = ω_i + K·R·sin(ψ_r − θ_i − α) + ζ·sin(Ψ − θ_i)

Ψ is exogenous when psi_mode="external" (no dotΨ equation). This is the reviewer's ζ sin(Ψ−θ) injection for plasma sync stability.

phase_sync_step_lyapunov

phase_sync_step_lyapunov(theta, omega, *, n_steps=100, dt=0.001, K=None, zeta=None, psi_driver=None, psi_mode=None)

Multi-step phase sync with Lyapunov stability tracking.

Returns final state, R trajectory, V trajectory, and λ exponent. λ < 0 ⟹ stable convergence toward Ψ.

save_results

save_results(filename='equilibrium_nonlinear.npz')

Save the equilibrium state to a compressed NumPy archive.

Parameters

filename : str Output file path.

TokamakConfig

TokamakConfig dataclass

TokamakConfig(name, R0, a, B0, Ip, kappa, delta, n_e, T_e, P_aux)

Tokamak equilibrium operating point.

Units: lengths [m], field [T], current [MA], density [1e19 m^-3], temperature [keV], power [MW].

iter classmethod

iter()

ITER baseline H-mode. ITER Research Plan (2018).

sparc classmethod

sparc()

SPARC V2C. Creely et al., J. Plasma Phys. 86, 865860502 (2020).

diiid classmethod

diiid()

DIII-D typical H-mode. Luxon, Nucl. Fusion 42, 614 (2002).

jet classmethod

jet()

JET with ITER-like wall. Joffrin et al., Nucl. Fusion 59 (2019).

TransportSolver

TransportSolver

TransportSolver(config_path, *, nr=50, multi_ion=False, transport_model='gyro_bohm')

Bases: FusionKernel

1.5D Integrated Transport Code. Solves Heat and Particle diffusion equations on flux surfaces, coupled self-consistently with the 2D Grad-Shafranov equilibrium.

When multi_ion=True, the solver evolves separate D/T fuel densities, He-ash transport with pumping (configurable tau_He), independent electron temperature Te, coronal-equilibrium tungsten radiation (Pütterich et al. 2010), and per-cell Bremsstrahlung.

energy_balance_error property

energy_balance_error

Relative energy conservation error from the last evolution step.

particle_balance_error property

particle_balance_error

Relative particle conservation error from the last evolution step.

Only meaningful in multi-ion mode. Returns 0.0 otherwise.

set_neoclassical

set_neoclassical(R0, a, B0, A_ion=2.0, Z_eff=1.5, q0=1.0, q_edge=3.0)

Configure Chang-Hinton neoclassical transport model.

When set, update_transport_model uses the Chang-Hinton formula instead of the constant chi_base = 0.5.

chang_hinton_chi_profile

chang_hinton_chi_profile()

Backward-compatible Chang-Hinton profile helper.

Older parity tests call this no-arg method on a partially-initialized transport object. Keep the method as a thin adapter over the module function so those tests remain stable.

inject_impurities

inject_impurities(flux_from_wall_per_sec, dt)

Models impurity influx from PWI erosion. Simple diffusion model: Source at edge, diffuses inward.

calculate_bootstrap_current_simple

calculate_bootstrap_current_simple(R0, B_pol)

Calculates the neoclassical bootstrap current density [A/m2] using a simplified Sauter model. J_bs = - (R/B_pol) * [ L31 * (dP/dpsi) + ... ]

calculate_bootstrap_current

calculate_bootstrap_current(R0, B_pol)

Calculate bootstrap current. Uses full Sauter if neoclassical params set.

update_transport_model

update_transport_model(P_aux)

Gyro-Bohm + neoclassical transport model with EPED-like pedestal.

When neoclassical params are set, uses: - Chang-Hinton neoclassical chi as additive floor - Gyro-Bohm anomalous transport (calibrated c_gB) - EPED-like pedestal model for H-mode boundary condition

Falls back to constant chi_base=0.5 when neoclassical is not configured.

evolve_profiles

evolve_profiles(dt, P_aux, enforce_conservation=False, ped_te=None, ped_ti=None)

Advance Ti by one time step using Crank-Nicolson implicit diffusion.

The scheme is unconditionally stable, allowing dt up to ~1.0 s without NaN. The full equation solved is:

(T^{n+1} - T^n)/dt = 0.5*[L_h(T^{n+1}) + L_h(T^n)] + S - Sink
Parameters

dt : float Time step [s]. P_aux : float Auxiliary heating power [MW]. enforce_conservation : bool When True, raise :class:PhysicsError if the per-step energy conservation error exceeds 1%. ped_te : PedestalProfile, optional Pedestal profile model for electron temperature. ped_ti : PedestalProfile, optional Pedestal profile model for ion temperature.

map_profiles_to_2d

map_profiles_to_2d()

Projects the 1D radial profiles back onto the 2D Grad-Shafranov grid, including neoclassical bootstrap current.

compute_confinement_time

compute_confinement_time(P_loss_MW)

Compute the energy confinement time from stored energy.

τ_E = W_stored / P_loss, where W_stored = ∫ 3/2 n (Ti+Te) dV and the volume element is estimated from the 1D radial profiles using cylindrical approximation.

Parameters

P_loss_MW : float Total loss power [MW]. Must be > 0.

Returns

float Energy confinement time [s].

run_self_consistent

run_self_consistent(P_aux, n_inner=100, n_outer=10, dt=0.01, psi_tol=0.001)

Run self-consistent GS <-> transport iteration.

This implements the standard integrated-modelling loop used by codes such as ASTRA and JINTRAC: evolve the 1D transport for n_inner steps, project profiles onto the 2D grid, re-solve the Grad-Shafranov equilibrium, and repeat until the poloidal-flux change drops below psi_tol.

Algorithm
  1. Run transport for n_inner steps (evolve Ti/Te/ne).
  2. Call :meth:map_profiles_to_2d to update J_phi on the 2D grid.
  3. Re-solve the Grad-Shafranov equilibrium with the updated source.
  4. Check psi convergence: ||Psi_new - Psi_old|| / ||Psi_old|| < psi_tol.
  5. Repeat until converged or n_outer iterations exhausted.
Parameters

P_aux : float Auxiliary heating power [MW]. n_inner : int Number of transport evolution steps per outer iteration. n_outer : int Maximum number of outer (GS re-solve) iterations. dt : float Transport time step [s]. psi_tol : float Relative psi convergence tolerance.

Returns

dict {"T_avg": float, "T_core": float, "tau_e": float, "n_outer_converged": int, "psi_residuals": list[float], "Ti_profile": ndarray, "ne_profile": ndarray, "converged": bool}

run_to_steady_state

run_to_steady_state(P_aux, n_steps=500, dt=0.01, adaptive=False, tol=0.001, self_consistent=False, sc_n_inner=100, sc_n_outer=10, sc_psi_tol=0.001)

Run transport evolution until approximate steady state.

Parameters

P_aux : float Auxiliary heating power [MW]. n_steps : int Number of evolution steps. dt : float Time step [s] (initial value when adaptive=True). adaptive : bool Use Richardson-extrapolation adaptive time stepping. tol : float Error tolerance for adaptive stepping. self_consistent : bool When True, delegate to :meth:run_self_consistent which iterates GS <-> transport to convergence. The remaining sc_* parameters are forwarded. sc_n_inner : int Transport steps per outer GS iteration (self-consistent mode). sc_n_outer : int Maximum outer GS iterations (self-consistent mode). sc_psi_tol : float Relative psi convergence tolerance (self-consistent mode).

Returns

dict {"T_avg": float, "T_core": float, "tau_e": float, "n_steps": int, "Ti_profile": ndarray, "ne_profile": ndarray} When adaptive=True, also includes dt_final, dt_history, error_history. When self_consistent=True, returns the :meth:run_self_consistent dict instead.

Scaling Laws

ipb98y2_tau_e

ipb98y2_tau_e(Ip, BT, ne19, Ploss, R, kappa, epsilon, M=2.5, *, coefficients=None)

Evaluate the IPB98(y,2) confinement time scaling law.

Parameters

Ip : float Plasma current [MA]. BT : float Toroidal magnetic field [T]. ne19 : float Line-averaged electron density [10^19 m^-3]. Ploss : float Loss power [MW]. Must be > 0. R : float Major radius [m]. kappa : float Elongation (κ). epsilon : float Inverse aspect ratio a/R. M : float Effective ion mass [AMU]. Default 2.5 (D-T). coefficients : dict, optional Pre-loaded coefficient dict. If None, loaded from disk.

Returns

float Predicted thermal energy confinement time τ_E [s].

Raises

ValueError If any input is non-finite or non-positive.

compute_h_factor

compute_h_factor(tau_actual, tau_predicted)

Compute the H-factor (enhancement factor over scaling law).

Parameters

tau_actual : float Measured or simulated confinement time [s]. tau_predicted : float IPB98(y,2) predicted confinement time [s].

Returns

float H98(y,2) = tau_actual / tau_predicted.

GEQDSK I/O

GEqdsk dataclass

GEqdsk(description='', nw=0, nh=0, rdim=0.0, zdim=0.0, rcentr=0.0, rleft=0.0, zmid=0.0, rmaxis=0.0, zmaxis=0.0, simag=0.0, sibry=0.0, bcentr=0.0, current=0.0, fpol=(lambda: array([], dtype=float64))(), pres=(lambda: array([], dtype=float64))(), ffprime=(lambda: array([], dtype=float64))(), pprime=(lambda: array([], dtype=float64))(), qpsi=(lambda: array([], dtype=float64))(), psirz=(lambda: array([], dtype=float64))(), rbdry=(lambda: array([], dtype=float64))(), zbdry=(lambda: array([], dtype=float64))(), rlim=(lambda: array([], dtype=float64))(), zlim=(lambda: array([], dtype=float64))())

Container for all data in a G-EQDSK file.

r property

r

1-D array of R grid values.

z property

z

1-D array of Z grid values.

psi_norm property

psi_norm

Normalised poloidal flux ψ_N ∈ [0, 1] (axis=0, boundary=1).

psi_to_norm

psi_to_norm(psi)

Map raw ψ to normalised ψ_N.

to_config

to_config(name='eqdsk', *, external_profiles=False)

Convert to FusionKernel JSON config dict.

Parameters

external_profiles : bool When True, include pprime/ffprime arrays so the GS solver uses the GEQDSK source profiles instead of its parametric model.

read_geqdsk

read_geqdsk(path)

Read a G-EQDSK file and return a :class:GEqdsk container.

Handles both whitespace-separated and fixed-width Fortran formats, including the common case where values run together without spaces (e.g. 2.385E+00-1.216E+01).

Parameters

path : str or Path Path to the G-EQDSK file.

Returns

GEqdsk Parsed equilibrium data.

write_geqdsk

write_geqdsk(eq, path)

Write a :class:GEqdsk to a G-EQDSK file.

Parameters

eq : GEqdsk Equilibrium data. path : str or Path Output file path.

Uncertainty Quantification

quantify_uncertainty

quantify_uncertainty(scenario, n_samples=10000, seed=None)

Monte Carlo uncertainty quantification for fusion performance.

Samples scaling-law coefficients from their Gaussian posteriors and propagates through the confinement and fusion power models.

Parameters

scenario : PlasmaScenario Plasma parameters (held fixed). n_samples : int Number of Monte Carlo samples (default 10,000). seed : int, optional Random seed for reproducibility.

Returns

UQResult — central estimates + error bars + percentiles.

quantify_full_chain

quantify_full_chain(scenario, n_samples=5000, seed=None, chi_gB_sigma=0.3, pedestal_sigma=0.2, boundary_sigma=0.02)

Full-chain Monte Carlo uncertainty propagation: equilibrium -> transport -> fusion power -> gain.

Now uses correlated sampling for IPB98 coefficients.

JAX-Accelerated Transport Primitives

Requires pip install "scpn-control[jax]". GPU execution automatic when jaxlib has CUDA/ROCm.

thomas_solve

thomas_solve(a, b, c, d, *, use_jax=True)

Tridiagonal solve with automatic JAX/GPU dispatch.

Parameters

a : sub-diagonal, length n-1 b : main diagonal, length n c : super-diagonal, length n-1 d : right-hand side, length n use_jax : attempt JAX backend (falls back to NumPy if unavailable)

diffusion_rhs

diffusion_rhs(T, chi, rho, drho, *, use_jax=True)

Cylindrical diffusion operator L_h(T) with JAX/GPU dispatch.

crank_nicolson_step

crank_nicolson_step(T, chi, source, rho, drho, dt, T_edge=0.1, *, use_jax=True)

Single Crank-Nicolson transport step with JAX/GPU dispatch.

Parameters

T : temperature profile, length n chi : diffusivity profile, length n source : net heating source, length n rho : radial grid, length n drho : grid spacing dt : timestep T_edge : edge boundary condition (Dirichlet), keV

batched_crank_nicolson

batched_crank_nicolson(T_batch, chi, source, rho, drho, dt, T_edge=0.1)

Batched transport step via jax.vmap for ensemble/sensitivity runs.

Parameters

T_batch : (batch, n) initial temperature profiles chi, source, rho, drho, dt, T_edge : shared across batch

Returns

T_new : (batch, n) updated profiles

Neural Equilibrium

NeuralEquilibriumAccelerator

NeuralEquilibriumAccelerator(config=None)

PCA + MLP surrogate for Grad-Shafranov equilibrium.

Can be trained from SPARC GEQDSK files (preferred) or from a FusionKernel config with coil perturbations.

evaluate_surrogate

evaluate_surrogate(X_test, Y_test_raw)

Evaluate on test set. Returns dict with mse, max_error, gs_residual.

train_from_geqdsk

train_from_geqdsk(geqdsk_paths, n_perturbations=25, seed=42)

Train on real SPARC GEQDSK equilibria with perturbations.

For each GEQDSK file, generates n_perturbations by scaling p'/FF' profiles, yielding n_files × n_perturbations training pairs.

Input features (12-dim): [I_p, B_t, R_axis, Z_axis, pprime_scale, ffprime_scale, simag, sibry, kappa, delta_upper, delta_lower, q95]

Output: flattened ψ(R,Z) → PCA coefficients

Uses 70/15/15 train/val/test split with val-loss early stopping (patience=20) and combined MSE + lambda_gs * GS residual loss.

predict

predict(features)

Predict ψ(R,Z) from input features.

Parameters

features : NDArray Shape (n_features,) or (batch, n_features).

Returns

NDArray Shape (nh, nw) or (batch, nh, nw).

save_weights

save_weights(path=DEFAULT_WEIGHTS_PATH)

Save model to .npz (no pickle dependency).

load_weights

load_weights(path=DEFAULT_WEIGHTS_PATH)

Load model from .npz.

benchmark

benchmark(features, n_runs=100)

Time inference over n_runs and return stats.

JAX-Accelerated Neural Equilibrium

Requires pip install "scpn-control[jax]". GPU and autodiff via jax.grad.

jax_neural_eq_predict

jax_neural_eq_predict(features, mlp_weights, pca_params, norm_params, grid_shape=(129, 129))

Predict psi(R,Z) from input features using JAX-accelerated pipeline.

Parameters

features : shape (n_features,) or (batch, n_features) mlp_weights : ((W0, W1, ...), (b0, b1, ...)) pca_params : (pca_mean, pca_components) norm_params : (input_mean, input_std) grid_shape : (nh, nw) output grid dimensions

Returns

psi : shape (nh, nw) or (batch, nh, nw)

jax_neural_eq_predict_batched

jax_neural_eq_predict_batched(features_batch, mlp_weights, pca_params, norm_params, grid_shape=(129, 129))

Batched equilibrium prediction via jax.vmap.

Parameters

features_batch : shape (batch, n_features)

Returns

psi_batch : shape (batch, nh, nw)

load_weights_as_jax

load_weights_as_jax(path)

Load neural equilibrium .npz weights into JAX-compatible structures.

Returns

(mlp_weights, pca_params, norm_params, grid_shape) mlp_weights: ((W0, W1, ...), (b0, b1, ...)) pca_params: (pca_mean, pca_components) norm_params: (input_mean, input_std) grid_shape: (nh, nw)

Neural Transport

cross_validate_neural_transport() benchmarks the active surrogate against the analytic critical-gradient reference across fixed regime cases and a canonical profile, so shipped weights can be checked against a deterministic local baseline instead of only reporting synthetic training RMSE.

NeuralTransportModel

NeuralTransportModel(weights_path=None, *, auto_discover=True)

Neural transport surrogate with analytic fallback.

On construction, attempts to load MLP weights from weights_path. If loading fails (file missing, wrong format), the model transparently falls back to :func:critical_gradient_model.

Parameters

weights_path : str or Path, optional Path to a .npz file containing MLP weights. The file must contain arrays: w1, b1, w2, b2, w3, b3, input_mean, input_std, output_scale.

Examples

model = NeuralTransportModel() # fallback mode inp = TransportInputs(grad_ti=8.0) fluxes = model.predict(inp) fluxes.chi_i > 0 True model.is_neural False

predict

predict(inp)

Predict turbulent transport fluxes for given local parameters.

Uses the neural MLP if weights are loaded, otherwise falls back to the analytic critical-gradient model.

Parameters

inp : TransportInputs Local plasma parameters at a single radial point.

Returns

TransportFluxes Predicted heat and particle diffusivities.

predict_profile

predict_profile(rho, te, ti, ne, q_profile, s_hat_profile, r_major=6.2)

Predict transport coefficients on the full radial profile.

Computes normalised gradients from the profile arrays via central finite differences, then evaluates the surrogate at each radial point. When the MLP is loaded the entire profile is evaluated in a single batched forward pass (no Python loop).

Parameters

rho : FloatArray Normalised radius grid (0 to 1), shape (N,). te, ti : FloatArray Electron/ion temperature profiles [keV], shape (N,). ne : FloatArray Electron density profile [10^19 m^-3], shape (N,). q_profile : FloatArray Safety factor profile, shape (N,). s_hat_profile : FloatArray Magnetic shear profile, shape (N,). r_major : float Major radius [m] for gradient normalisation.

Returns

chi_e, chi_i, d_e : FloatArray Transport coefficient profiles, each shape (N,).

cross_validate_neural_transport

cross_validate_neural_transport(model=None, *, benchmark_cases=None, profile_points=24)

Compare the current transport surrogate to the analytic reference.

Returns pointwise and profile-level error metrics against the :func:critical_gradient_model benchmark. This is a reference check, not a claim of agreement with full QLKNN-10D or first-principles turbulence solvers.

MHD Stability

run_full_stability_check

run_full_stability_check(qp, beta_t=None, Ip_MA=None, a=None, B0=None, j_bs=None, j_total=None)

Run all available MHD stability criteria and return a summary.

Mercier, ballooning, and Kruskal-Shafranov are always evaluated. Troyon requires beta_t, Ip_MA, a, and B0. NTM requires j_bs, j_total, and a.

Parameters

qp : QProfile — pre-computed safety-factor profile beta_t : float, optional — total toroidal beta (dimensionless) Ip_MA : float, optional — plasma current [MA] a : float, optional — minor radius [m] B0 : float, optional — toroidal field on axis [T] j_bs : array, optional — bootstrap current density [A/m^2] j_total : array, optional — total current density [A/m^2]

Returns

StabilitySummary

IMAS Adapter

EquilibriumIDS dataclass

EquilibriumIDS(r, z, psi, j_tor, ip, b0, r0, time=0.0)

Minimal subset of IMAS equilibrium IDS for scpn-control interop.

Fields match equilibrium.time_slice[0].profiles_2d[0].

from_geqdsk

from_geqdsk(filepath)

Import from GEQDSK file via scpn_control.core.eqdsk.

from_kernel

from_kernel(kernel, time=0.0)

Extract EquilibriumIDS from a solved FusionKernel instance.

HPC Bridge

HPCBridge

HPCBridge(lib_path=None)

Interface between Python and the compiled C++ Grad-Shafranov solver.

Loads the shared library (libscpn_solver.so / scpn_solver.dll) at construction time. If the library is not found the bridge gracefully degrades — :meth:is_available returns False and the caller falls back to Python.

Parameters

lib_path : str, optional Explicit path to the shared library. When None (default) the bridge searches trusted package-local locations only, unless SCPN_SOLVER_LIB is set explicitly.

is_available

is_available()

Return True if the compiled solver library was loaded.

close

close()

Release the C++ solver instance, if one was created.

initialize

initialize(nr, nz, r_range, z_range, boundary_value=0.0)

Create the C++ solver instance for the given grid dimensions.

set_boundary_dirichlet

set_boundary_dirichlet(boundary_value=0.0)

Set a fixed Dirichlet boundary value for psi edges, if supported.

solve

solve(j_phi, iterations=100)

Run the C++ solver for iterations sweeps.

Returns None if the library is not loaded (caller should fall back to a Python solver).

solve_into

solve_into(j_phi, psi_out, iterations=100)

Run the C++ solver and write results into psi_out in-place.

solve_until_converged

solve_until_converged(j_phi, max_iterations=1000, tolerance=1e-06, omega=1.8)

Run solver until convergence, if native API is available.

Returns (psi, iterations_used, final_delta). If the library is unavailable or uninitialized, returns None.

solve_until_converged_into

solve_until_converged_into(j_phi, psi_out, max_iterations=1000, tolerance=1e-06, omega=1.8)

Run convergence API and write results into psi_out in-place.

Gyrokinetic Transport (v0.16.0)

GyrokineticTransportModel

GyrokineticTransportModel(n_modes=16, include_etg=False)

Drop-in replacement for Gyro-Bohm transport scaling.

evaluate

evaluate(rho, profiles)

Evaluate transport coefficients at a single radial point.

evaluate_profile

evaluate_profile(rho, profiles)

Evaluate full radial profile.

GK Solver Interface (v0.17.0)

GKSolverBase

Bases: ABC

Abstract base for gyrokinetic solvers.

Concrete subclasses must implement prepare_input, run, and is_available.

prepare_input abstractmethod

prepare_input(params)

Write solver-specific input deck to a temporary directory.

Returns the path to the input file or directory.

run abstractmethod

run(input_path, *, timeout_s=30.0)

Execute the solver and parse output into GKOutput.

is_available abstractmethod

is_available()

Return True if the solver binary/library is installed.

run_from_params

run_from_params(params, *, timeout_s=30.0)

Convenience: prepare + run in one call.

GKLocalParams dataclass

GKLocalParams(R_L_Ti, R_L_Te, R_L_ne, q, s_hat, alpha_MHD=0.0, Te_Ti=1.0, Z_eff=1.5, nu_star=0.1, beta_e=0.01, epsilon=0.1, kappa=1.0, delta=0.0, rho=0.5, R0=6.2, a=2.0, B0=5.3, n_e=10.0, T_e_keV=8.0, T_i_keV=8.0)

Local plasma parameters at a single flux surface.

The first 11 fields match the GyrokineticsParams convention already used by gyrokinetic_transport.py. The remaining fields add Miller geometry and dimensional quantities needed by external GK codes.

GKOutput dataclass

GKOutput(chi_i, chi_e, D_e, D_i=0.0, gamma=(lambda: empty(0))(), omega_r=(lambda: empty(0))(), k_y=(lambda: empty(0))(), dominant_mode='stable', converged=True, electromagnetic=False)

Gyrokinetic solver output for a single flux surface.

Fluxes are in physical units [m^2/s]. Growth rate arrays are normalised to c_s / a.

Native Linear GK Solver (v0.17.0)

solve_linear_gk

solve_linear_gk(species_list=None, geom=None, vgrid=None, R0=2.78, a=1.0, B0=2.0, q=1.4, s_hat=0.78, Z_eff=1.0, nu_star=0.01, n_ky_ion=16, n_ky_etg=0, n_theta=64, n_period=2, electromagnetic=False, beta_e=0.0, alpha_MHD=0.0)

Full k_y scan of the linear GK eigenvalue solver.

Parameters

species_list : list of GKSpecies Plasma species. Default: [deuterium, adiabatic electron]. geom : MillerGeometry Flux-tube geometry. Default: circular with given (R0, a, q, s_hat). vgrid : VelocityGrid Velocity-space grid. Default: (16, 24). n_ky_ion, n_ky_etg : int Number of k_y points on ion and electron scales. electromagnetic : bool Enable finite-beta EM corrections (A_∥, delta-B_∥). Default False. beta_e : float Electron beta = 2 mu_0 n_e T_e / B_0^2. alpha_MHD : float Normalised pressure gradient alpha = -q^2 R dp/dr / (B^2/2mu_0).

quasilinear_fluxes_from_spectrum

quasilinear_fluxes_from_spectrum(result, ion, R0=2.78, a=1.0, B0=2.0, saturation='mixing_length', electron=None)

Convert linear GK spectrum to physical transport fluxes.

Parameters

result : LinearGKResult Output from solve_linear_gk(). ion : GKSpecies Main ion species (for gyro-Bohm normalisation). R0, a, B0 : float Geometry and field for dimensional conversion. saturation : str Saturation model: "mixing_length" (default). electron : GKSpecies | None Electron species for Te/Ti ratio. Defaults to ion temperature (Te/Ti=1).

GK Hybrid Validation (v0.17.0)

OODDetector

OODDetector(*, mahalanobis_threshold=4.0, soft_sigma_threshold=2.0, ensemble_disagreement_threshold=0.3, training_mean=None, training_cov_inv=None)

Three-method OOD detector for surrogate transport models.

Parameters

mahalanobis_threshold : float Mahalanobis distance above which an input is flagged OOD. Default 4.0 corresponds to ~99.99% chi-squared quantile for 10D. soft_sigma_threshold : float Number of training-set standard deviations for soft range check. ensemble_disagreement_threshold : float Relative std across ensemble predictions above which to flag OOD.

mahalanobis_distance

mahalanobis_distance(x)

Compute Mahalanobis distance from training mean.

check_range

check_range(x)

Check hard bounds and soft sigma bounds.

check_ensemble

check_ensemble(predictions)

Check disagreement across ensemble predictions.

Parameters

predictions : array, shape (K, 3) Ensemble of [chi_i, chi_e, D_e] predictions from K models.

check

check(x, ensemble_predictions=None)

Run all applicable checks and return combined result.

GKScheduler

GKScheduler(config=None)

Spot-check scheduler for hybrid surrogate+GK transport.

step

step(rho, chi_i, ood_results=None)

Decide whether to run GK spot-checks this step.

Parameters

rho : array Radial grid. chi_i : array Current ion diffusivity profile (from surrogate). ood_results : list of OODResult or None Per-surface OOD detector results (adaptive mode).

Returns

SpotCheckRequest or None None if no validation needed this step.

GKCorrector

GKCorrector(nr, config=None)

Apply corrections to surrogate transport based on GK spot-checks.

update

update(records, rho)

Incorporate new spot-check results into correction factors.

Interpolates correction factors from spot-check surfaces to the full radial grid with temporal EMA smoothing.

correct

correct(chi_i, chi_e, D_e)

Apply stored correction factors to surrogate profiles.

Ballooning Solver (v0.16.0)

BallooningEquation

BallooningEquation(s, alpha, theta_max=20 * pi, n_theta=2001)

Second-order ODE for ideal MHD ballooning stability in the s-alpha model.

f

f(theta)

Field-line bending term coefficient.

g

g(theta)

Curvature drive term coefficient.

solve

solve()

Solve via Newcomb shooting: ξ crossing zero signals instability. No zero crossing within [0, θ_max] means stable.

BallooningStabilityAnalysis

Performs ballooning stability analysis given a QProfile.

analyze

analyze(q_profile)

Extracts (s, alpha) at each radial point and returns per-radius stability margin. Positive margin means stable. margin = alpha_crit(s) - alpha_actual

find_marginal_stability

find_marginal_stability(s, alpha_min=0.0, alpha_max=2.0, tol=0.001)

Binary search for alpha_crit at fixed shear s. Returns the critical alpha (first stability boundary).

Current Diffusion (v0.16.0)

CurrentDiffusionSolver

CurrentDiffusionSolver(rho, R0, a, B0)

Implicit Crank-Nicolson solver for poloidal flux diffusion.

Governing equation (1D in ρ, cylindrical limit): ∂ψ/∂t = (η_∥/μ₀) · [∂²ψ/∂ρ² + (1/ρ)·∂ψ/∂ρ] + R₀·η_∥·j_source Reference: Jardin (2010), Ch. 8, Eq. 8.8.

η_∥ from neoclassical_resistivity() — Sauter 1999/2002 + Hirshman 1981. Crank-Nicolson discretisation — θ=½ implicit-explicit split, second-order accurate in time and space.

step

step(dt, Te, ne, Z_eff, j_bs, j_cd, j_ext=None)

Advance ψ by one timestep dt [s].

Source term: j_source = j_bs + j_cd + j_ext Boundary conditions: axis (ρ=0) — regularity (L'Hôpital); edge (ρ=1) — Dirichlet ψ=const.

Current Drive (v0.16.0)

ECCDSource

ECCDSource(P_ec_MW, rho_dep, sigma_rho, eta_cd=_ETA_ECCD_DEFAULT)

Electron Cyclotron Current Drive.

Default η_cd: Fisch & Boozer 1980, PRL 45, 720 (theory); range 0.02–0.05 A/W depending on launch angle and T_e. For angle-resolved efficiency, use eccd_efficiency().

P_absorbed

P_absorbed(rho)

Absorbed power density [W/m^3] — Gaussian deposition profile.

j_cd

j_cd(rho, ne_19, Te_keV)

Driven current density [A/m^2].

j_cd = η_cd · P_abs / (n_e · T_e) Fisch & Boozer 1980, PRL 45, 720, normalised form.

NBISource

NBISource(P_nbi_MW, E_beam_keV, rho_tangency, sigma_rho=0.15, A_beam=_A_BEAM_D, Z_beam=_Z_BEAM_D)

Neutral Beam Injection.

Current-drive efficiency formula: Ehst & Karney 1991, Nucl. Fusion 31, 1933. Slowing-down time: Stix 1972, Plasma Physics 14, 367, Eq. 6.

P_heating

P_heating(rho)

Beam power deposition profile [W/m^3].

j_cd

j_cd(rho, ne_19, Te_keV, Ti_keV, Z_eff=1.5)

Beam-driven current density [A/m^2].

Fast-ion density from steady-state balance: n_fast = P_heat · τ_s / E_beam τ_s from Stix 1972, Eq. 6 (nbi_slowing_down_time). j_cd = e · n_fast · v_∥ / Z_beam Ehst & Karney 1991, Nucl. Fusion 31, 1933.

Note: Ti_keV enters via Z_eff-dependent collisionality; here held constant.

CurrentDriveMix

CurrentDriveMix(a=1.0)

Superposition of multiple current-drive sources.

total_driven_current

total_driven_current(rho, ne, Te, Ti)

I_cd = ∫ j_cd · 2π r dr [A]

r = ρ · a, dr = dρ · a → dA = 2π ρ a² dρ

NTM Dynamics (v0.16.0)

NTMController

NTMController(w_onset=0.02, w_target=0.005)

ECCD-based NTM controller: triggers at onset, deactivates at target.

step

step(w, rho_rs, max_power=20.0)

Return requested ECCD power [MW]; update controller state.

Sawtooth Model (v0.16.0)

SawtoothCycler

SawtoothCycler(rho, R0, a, s_crit=0.1, trigger_model='shear', porcelli_params=None)

Tracks and triggers sawtooth crashes.

kadomtsev_crash

kadomtsev_crash(rho, T, n, q, R0, a)

Apply Kadomtsev full-reconnection crash.

Returns (T_new, n_new, q_new, rho_1, rho_mix).

Helical flux proxy: dψ/dρ = ρ(1/q − 1), integrated from axis. rho_mix is where ψ returns to zero outside the q=1 surface. Inside rho_mix, T, n are volume-averaged and q is reset to 1.01.

Kadomtsev 1975, Sov. J. Plasma Phys. 1, 389. Porcelli et al. 1996, PPCF 38, 2163, §2.

SOL Model (v0.16.0)

TwoPointSOL

TwoPointSOL(R0, a, q95, B_pol, kappa=1.0)

Two-point model for the Scrape-Off Layer.

Parallel heat conduction: q_∥ = κ_0 T_u^{7/2} / (7 L_∥). Stangeby 2000, Eq. 5.69.

solve

solve(P_SOL_MW, n_u_19, f_rad=0.0)

Solve two-point model for upstream and target conditions.

Upstream temperature from conduction integral (Stangeby 2000, Eq. 5.69): T_u = (3.5 · L_∥ · q_∥ / κ_0)^{2/7}

Target heat flux: q_target = P_SOL / (4π R λ_q f_exp). Stangeby 2000, Ch. 5.

Integrated Scenario (v0.16.0)

IntegratedScenarioSimulator

IntegratedScenarioSimulator(config)

Time-dependent tokamak shot simulator.

Operator splitting sequence per step() (Strang 1968): a) half-step transport diffusion b) full-step sources c) full-step MHD events d) half-step transport diffusion

State variables carried on 1D rho grid: Te, Ti, ne, psi (→ q, j).

step

step()

Advance one time step using Strang operator splitting.

Strang 1968, SIAM J. Numer. Anal. 5, 506: a) half-step transport b) full-step sources c) full-step MHD events d) half-step transport

iter_baseline_scenario

iter_baseline_scenario()

SCPN — Petri Net Compiler

StochasticPetriNet

StochasticPetriNet

StochasticPetriNet()

Stochastic Petri Net with sparse matrix representation.

Usage::

net = StochasticPetriNet()
net.add_place("P_red",   initial_tokens=1.0)
net.add_place("P_green", initial_tokens=0.0)
net.add_transition("T_r2g", threshold=0.5)
net.add_arc("P_red", "T_r2g", weight=1.0)
net.add_arc("T_r2g", "P_green", weight=1.0)
net.compile()

add_place

add_place(name, initial_tokens=0.0)

Add a place (state variable) with token density in [0, 1].

add_transition

add_transition(name, threshold=0.5, delay_ticks=0)

Add a transition (logic gate) with a firing threshold.

add_arc

add_arc(source, target, weight=1.0, inhibitor=False)

Add a directed arc between a Place and a Transition (either direction).

Valid arcs

Place -> Transition (input arc, stored in W_in) Transition -> Place (output arc, stored in W_out)

Parameters

inhibitor : bool, default False If True, arc is encoded as a negative weight inhibitor input arc. Only valid for Place -> Transition edges.

Raises ValueError for same-kind connections or unknown nodes.

compile

compile(validate_topology=False, strict_validation=False, allow_inhibitor=False)

Build sparse W_in and W_out matrices from the arc list.

Parameters

validate_topology : bool, default False Compute and store topology diagnostics in :attr:last_validation_report. strict_validation : bool, default False If True, raise ValueError when topology diagnostics contain dead nodes or unseeded place cycles. allow_inhibitor : bool, default False Allow negative Place -> Transition weights for inhibitor arcs. If False, compile fails when inhibitor arcs are present.

validate_topology

validate_topology()

Return topology diagnostics without mutating compiled matrices.

Diagnostics: - dead_places: places with zero total degree. - dead_transitions: transitions with zero total degree. - unseeded_place_cycles: place-level SCC cycles with no initial token mass. - input_weight_overflow_transitions: transitions whose positive input arc-weight sum exceeds 1.0.

get_initial_marking

get_initial_marking()

Return (n_places,) float64 vector of initial token densities.

get_thresholds

get_thresholds()

Return (n_transitions,) float64 vector of firing thresholds.

get_delay_ticks

get_delay_ticks()

Return (n_transitions,) int64 vector of transition delay ticks.

summary

summary()

Human-readable summary of the net.

verify_boundedness

verify_boundedness(n_steps=500, n_trials=100)

Verify that markings stay in [0, 1] under random firing.

Runs n_trials random trajectories of n_steps each, checking that all markings remain in [0, 1] at every step.

Returns

dict {"bounded": bool, "max_marking": float, "min_marking": float, "n_trials": int, "n_steps": int}

verify_liveness

verify_liveness(n_steps=200, n_trials=1000)

Verify that all transitions fire at least once in random campaigns.

Returns

dict {"live": bool, "transition_fire_pct": dict[str, float], "min_fire_pct": float}

FusionCompiler

FusionCompiler

FusionCompiler(bitstream_length=1024, seed=42, *, lif_tau_mem=1000000.0, lif_noise_std=0.0, lif_dt=1.0, lif_resistance=1.0, lif_refractory_period=0)

Compiles a StochasticPetriNet into a CompiledNet.

Parameters

bitstream_length : Number of bits per stochastic stream (default 1024). seed : Base RNG seed for reproducibility.

with_reactor_lif_defaults classmethod

with_reactor_lif_defaults(bitstream_length=1024, seed=42, *, lif_dt=1.0, lif_resistance=1.0, lif_refractory_period=0)

Create a compiler with reactor-like LIF defaults.

Keeps the legacy constructor defaults untouched for compatibility while exposing a realistic preset for new control experiments.

traceable_runtime_kwargs staticmethod

traceable_runtime_kwargs(*, runtime_backend='auto')

Recommended controller kwargs for traceable runtime loops.

compile

compile(net, firing_mode='binary', firing_margin=0.05, *, allow_inhibitor=False, validate_topology=False, strict_topology=False)

Compile the Petri Net into sc_neurocore artifacts.

Parameters

net : compiled StochasticPetriNet. firing_mode : "binary" (default) or "fractional". firing_margin : margin for fractional firing (ignored in binary mode). allow_inhibitor : enable inhibitor arc compilation. validate_topology : run topology diagnostics during compile. strict_topology : raise if topology diagnostics detect issues.

Steps
  1. Extract dense W_in (nT x nP) and W_out (nP x nT).
  2. Create one LIF neuron per transition (pure threshold comparator).
  3. Pre-encode weight matrices as packed uint64 bitstreams.
  4. Return CompiledNet with all artifacts.

CompiledNet

CompiledNet dataclass

CompiledNet(n_places, n_transitions, place_names, transition_names, W_in, W_out, W_in_packed=None, W_out_packed=None, neurons=list(), bitstream_length=1024, thresholds=(lambda: array([], dtype=float64))(), transition_delay_ticks=(lambda: array([], dtype=int64))(), initial_marking=(lambda: array([], dtype=float64))(), seed=42, firing_mode='binary', firing_margin=0.05, lif_tau_mem=1000000.0, lif_noise_std=0.0, lif_dt=1.0, lif_resistance=1.0, lif_refractory_period=0)

Compiled Petri Net ready for sc_neurocore execution.

Holds both the dense float matrices (for validation / fallback) and pre-packed uint64 weight bitstreams (for the stochastic path).

dense_forward

dense_forward(W_packed, input_probs)

Stochastic matrix-vector product via AND + popcount.

Parameters

W_packed : (n_out, n_in, n_words) uint64 — pre-packed weight bitstreams. input_probs : (n_in,) float64 — input probabilities in [0, 1].

Returns

output : (n_out,) float64 — stochastic estimate of W @ input_probs.

dense_forward_float

dense_forward_float(W, inputs)

Float-path validation: simple W @ inputs.

lif_fire

lif_fire(currents)

Run LIF threshold detection on all transitions.

Binary mode: f_t = 1 if current >= threshold else 0 Fractional mode: f_t = clip((current - threshold) / margin, 0, 1)

Parameters

currents : (n_transitions,) float64 — weighted-sum activations.

Returns

fired : (n_transitions,) float64 vector. Binary mode → values in {0.0, 1.0}. Fractional mode → values in [0.0, 1.0].

export_artifact

export_artifact(name='controller', dt_control_s=0.001, readout_config=None, injection_config=None)

Build an Artifact from compiled state + user-provided config.

Parameters

name : artifact name. dt_control_s : control tick period (s). readout_config : dict with actions, gains, abs_max, slew_per_s lists. Required for a complete artifact. injection_config : list of place-injection dicts.

NeuroSymbolicController

NeuroSymbolicController

NeuroSymbolicController(artifact, seed_base, targets, scales, sc_n_passes=8, sc_bitflip_rate=0.0, sc_binary_margin=None, sc_antithetic=True, enable_oracle_diagnostics=True, feature_axes=None, runtime_profile='adaptive', runtime_backend='auto', rust_backend_min_problem_size=1, sc_antithetic_chunk_size=2048)

Reference controller with oracle float and stochastic paths.

Parameters

artifact : loaded .scpnctl.json artifact. seed_base : 64-bit base seed for deterministic stochastic execution. targets : control setpoint targets. scales : normalisation scales.

reset

reset()

Restore initial marking and zero previous actions.

step

step(obs, k, log_path=None)

Execute one control tick.

Steps
  1. extract_features(obs) → 4 unipolar features
  2. _inject_places(features)
  3. _oracle_step() — float path (optional)
  4. _sc_step(k) — deterministic stochastic path
  5. _decode_actions() — gain × differencing, slew + abs clamp
  6. Optional JSONL logging

step_traceable

step_traceable(obs_vector, k, log_path=None)

Execute one control tick from a fixed-order observation vector.

The vector order is self._axis_obs_keys. This avoids per-step key lookups/dict allocation in tight control loops.

Contracts

ControlObservation

Bases: TypedDict

Plant observation at a single control tick.

ControlAction

Bases: TypedDict

Actuator command output for a single control tick.

Keys are dynamic and depend on the Petri Net readout configuration.

ControlTargets dataclass

ControlTargets(R_target_m=6.2, Z_target_m=0.0)

Setpoint targets for the control loop.

extract_features

extract_features(obs, targets, scales, feature_axes=None, passthrough_keys=None)

Map observation → unipolar [0, 1] feature sources.

By default returns keys x_R_pos, x_R_neg, x_Z_pos, x_Z_neg. Custom feature mappings can be supplied via feature_axes for arbitrary observation dictionaries.

decode_actions

decode_actions(marking, actions_spec, gains, abs_max, slew_per_s, dt, prev)

Decode marking → actuator commands with gain, slew-rate, and abs clamp.

Parameters

marking : current marking vector (len >= max place index). actions_spec : per-action pos/neg place definitions. gains : per-action gain multiplier. abs_max : per-action absolute saturation. slew_per_s : per-action max change rate (units/s). dt : control tick period (s). prev : previous action outputs (same length as actions_spec).

Returns

dict mapping action name → clamped value. Also mutates prev in-place.

Artifacts

Artifact dataclass

Artifact(meta, topology, weights, readout, initial_state)

Full SCPN controller artifact (.scpnctl.json).

save_artifact

save_artifact(artifact, path, compact_packed=False)

Serialize an Artifact to indented JSON.

load_artifact

load_artifact(path)

Parse a .scpnctl.json file into an Artifact dataclass.


Phase — Paper 27 Dynamics

Kuramoto-Sakaguchi Step

kuramoto_sakaguchi_step

kuramoto_sakaguchi_step(theta, omega, *, dt, K, alpha=0.0, zeta=0.0, psi_driver=None, psi_mode='external', wrap=True)

Single Euler step of mean-field Kuramoto-Sakaguchi + global driver.

dθ_i/dt = ω_i + K·R·sin(ψ_r − θ_i − α) + ζ·sin(Ψ − θ_i)

Uses Rust backend when available (rayon-parallelised, sub-ms for N>1000).

order_parameter

order_parameter(theta, weights=None)

Kuramoto order parameter R·exp(i·ψ_r) = / W.

Returns (R, ψ_r).

lyapunov_v

lyapunov_v(theta, psi)

Lyapunov candidate V(t) = (1/N) Σ (1 − cos(θ_i − Ψ)).

V=0 at perfect sync (all θ_i = Ψ), V=2 at maximal desync. Range: [0, 2]. Mirror of control-math/kuramoto.rs::lyapunov_v.

lyapunov_exponent

lyapunov_exponent(v_hist, dt)

λ = (1/T) · ln(V_final / V_initial). λ < 0 ⟹ stable.

wrap_phase

wrap_phase(x)

Map phases to (-π, π].

GlobalPsiDriver dataclass

GlobalPsiDriver(mode='external')

Resolve the global field phase Ψ.

mode="external" : Ψ supplied by caller (intention/carrier, no dotΨ). mode="mean_field" : Ψ = arg() from the oscillator population.

Knm Coupling Matrix

KnmSpec dataclass

KnmSpec(K, alpha=None, zeta=None, layer_names=None)

Paper 27 coupling specification.

K : (L, L) coupling matrix. K[n, m] = source n -> target m. alpha : (L, L) Sakaguchi phase-lag (optional). zeta : (L,) per-layer global-driver gain ζ_m (optional).

build_knm_paper27

build_knm_paper27(L=16, K_base=0.45, K_alpha=0.3, zeta_uniform=0.0)

Build the canonical Paper 27 Knm with exponential distance decay.

K[i,j] = K_base · exp(−K_alpha · |i − j|), diag(K) kept for intra-layer sync (unlike the inter-oscillator Knm which zeros diag).

K_base=0.45 and K_alpha=0.3 from Paper 27 §3.2, Eq. 12. Calibration anchors from Paper 27, Table 2. Cross-hierarchy boosts from Paper 27 §4.3.

UPDE Multi-Layer Solver

UPDESystem dataclass

UPDESystem(spec, dt=0.001, psi_mode='external', wrap=True)

Multi-layer UPDE driven by a KnmSpec.

step

step(theta_layers, omega_layers, *, psi_driver=None, actuation_gain=1.0, pac_gamma=0.0, K_override=None)

Advance all L layers by one Euler step.

Parameters

theta_layers : sequence of 1D arrays Phase vectors per layer. omega_layers : sequence of 1D arrays Natural frequencies per layer. psi_driver : float or None External global field phase Ψ (required if psi_mode="external"). actuation_gain : float Multiplicative gain on all coupling terms. pac_gamma : float PAC-like gating: boost inter-layer coupling by (1 + pac_gamma·(1 − R_source)). K_override : array or None Per-tick replacement for spec.K (adaptive coupling).

run

run(n_steps, theta_layers, omega_layers, *, psi_driver=None, actuation_gain=1.0, pac_gamma=0.0, K_override=None)

Run n_steps and return trajectory of per-layer R and global R.

run_lyapunov

run_lyapunov(n_steps, theta_layers, omega_layers, *, psi_driver=None, actuation_gain=1.0, pac_gamma=0.0, K_override=None)

Run n_steps with Lyapunov tracking.

Returns R histories, V histories, and per-layer + global λ. λ < 0 ⟹ stable convergence toward Ψ.

Lyapunov Guard

LyapunovGuard

LyapunovGuard(window=50, dt=0.001, lambda_threshold=0.0, max_violations=3)

Sliding-window Lyapunov stability monitor.

Parameters

window : int Number of V(t) samples in the sliding window. dt : float Timestep between samples (for λ computation). lambda_threshold : float λ above this value counts as a violation. Default 0 (any growth). max_violations : int Consecutive violations before guard refuses.

check

check(theta, psi)

Feed one sample and return stability verdict.

check_trajectory

check_trajectory(v_hist)

Batch check: compute λ from a full V(t) trajectory.

to_director_ai_dict

to_director_ai_dict(verdict)

Export verdict in DIRECTOR_AI AuditLogger format.

Realtime Monitor

RealtimeMonitor dataclass

RealtimeMonitor(upde, guard, theta_layers, omega_layers, psi_driver=0.0, pac_gamma=0.0, adaptive_engine=None)

Tick-by-tick UPDE monitor with LyapunovGuard.

from_paper27 classmethod

from_paper27(L=16, N_per=50, *, dt=0.001, zeta_uniform=0.5, psi_driver=0.0, pac_gamma=0.0, guard_window=50, guard_max_violations=3, seed=42)

Build from Paper 27 defaults.

from_plasma classmethod

from_plasma(L=8, N_per=50, *, mode='baseline', dt=0.001, zeta_uniform=0.0, psi_driver=0.0, pac_gamma=0.0, guard_window=50, guard_max_violations=3, adaptive_engine=None, seed=42)

Build from plasma-native Knm defaults with optional adaptive engine.

tick

tick(*, record=True, beta_n=0.0, q95=3.0, disruption_risk=0.0, mirnov_rms=0.0)

Advance one UPDE step and return dashboard snapshot.

reset

reset(seed=42)

Reset oscillator phases, guard state, and recorder.

save_hdf5

save_hdf5(path)

Export recorded trajectory to HDF5.

Datasets: R_global, R_layer, V_global, V_layer, lambda_exp, guard_approved, latency_us, Psi_global. Attributes: L, N_per, psi_driver, pac_gamma, n_ticks.

Requires h5py (pip install h5py).

save_npz

save_npz(path)

Export recorded trajectory to compressed NPZ (no h5py needed).

TrajectoryRecorder dataclass

TrajectoryRecorder(R_global=list(), R_layer=list(), V_global=list(), V_layer=list(), lambda_exp=list(), guard_approved=list(), latency_us=list(), Psi_global=list())

Accumulates tick snapshots for batch export.

Adaptive Knm Engine

AdaptiveKnmEngine

AdaptiveKnmEngine(baseline_spec, config=None)

Diagnostic-driven online adaptation of the Knm coupling matrix.

Holds a baseline K from a KnmSpec and produces K_adapted each tick.

adaptation_summary property

adaptation_summary

Snapshot of engine state for dashboard export.

update

update(snap)

Apply all adaptation channels and return K_adapted.

reset

reset()

Revert to baseline and clear integral state.

AdaptiveKnmConfig dataclass

AdaptiveKnmConfig(beta_scale=0.3, beta_max_boost=0.5, risk_pairs=((2, 5), (3, 5), (2, 4)), risk_gain=0.4, coherence_Kp=0.15, coherence_Ki=0.02, coherence_R_target=0.6, coherence_max_boost=0.3, max_delta_per_tick=0.02, revert_on_guard_refusal=True)

Tuning knobs for each adaptation channel.

DiagnosticSnapshot dataclass

DiagnosticSnapshot(R_layer, V_layer, lambda_exp, beta_n, q95, disruption_risk, mirnov_rms, guard_approved)

Per-tick plasma diagnostic bundle.

Plasma Knm

build_knm_plasma

build_knm_plasma(mode='baseline', L=8, K_base=0.3, zeta_uniform=0.0, custom_overrides=None, layer_names=None)

Build a plasma-native Knm coupling matrix.

Parameters

mode : str Instability scenario bias. One of: baseline, elm, ntm, sawtooth, hybrid. L : int Number of plasma layers (default 8). K_base : float Base coupling strength for exponential decay backbone. zeta_uniform : float Global driver gain ζ applied uniformly to all layers. custom_overrides : dict, optional Explicit (i, j) → value overrides applied last. Automatically symmetrised. layer_names : sequence of str, optional Layer labels. Defaults to PLASMA_LAYER_NAMES[:L] or PLASMA_LAYER_NAMES_16[:L].

Returns

KnmSpec Ready for UPDESystem consumption.

WebSocket Stream

PhaseStreamServer dataclass

PhaseStreamServer(monitor, tick_interval_s=0.001)

Async WebSocket server wrapping a RealtimeMonitor.

serve async

serve(host='0.0.0.0', port=8765)

Start WebSocket server and tick loop.

serve_sync

serve_sync(host='0.0.0.0', port=8765)

Blocking entry point.


Control — Controllers

H-infinity (Riccati DARE)

HInfinityController

HInfinityController(A, B1, B2, C1, C2, gamma=None, D12=None, D21=None, enforce_robust_feasibility=False)

Riccati-based H-infinity controller for tokamak vertical stability.

Synthesis: Doyle et al. 1989, IEEE TAC 34, 831 (state-space ARE formulation). Riccati sign convention: Zhou, Doyle & Glover 1996, Eq. 14.18. γ-iteration: Green & Limebeer 1995, Ch. 13 (binary search on ||T_{zw}||_∞). Weighting selection: Ariola & Pironti 2008, Ch. 5 (tokamak vertical loop).

Parameters

A : array_like, shape (n, n) Plant state matrix. B1 : array_like, shape (n, p) Disturbance input matrix. B2 : array_like, shape (n, m) Control input matrix. C1 : array_like, shape (q, n) Performance output matrix. C2 : array_like, shape (l, n) Measurement output matrix. gamma : float, optional H-infinity attenuation level. If None, found by bisection. D12 : array_like, optional Feedthrough from control to performance. Default: identity-like. D21 : array_like, optional Feedthrough from disturbance to measurement. Default: identity-like. enforce_robust_feasibility : bool, optional If True, raise ValueError unless rho(XY) < gamma^2 after synthesis.

is_stable property

is_stable

True iff all eigenvalues of A + B2 F lie in the open left-half plane.

stability_margin_db property

stability_margin_db

Eigenvalue-based stability margin in dB.

Not the classical Bode gain margin. Measures the ratio of closed-loop to open-loop dominant eigenvalue real parts. For frequency-domain gain margin, use a Bode analysis on the loop transfer function.

gain_margin_db property

gain_margin_db

Alias for stability_margin_db (backward compatibility).

step

step(error, dt)

Compute control action for one timestep.

Uses DARE-synthesised gains on the ZOH-discretised plant.

Parameters

error : float Observed measurement (y). dt : float Timestep [s].

Returns

float Control action u.

riccati_residual_norms

riccati_residual_norms()

Frobenius norms of the two H-infinity ARE residuals.

Residual form: X A + A^T X + Q - X B R^{-1} B^T X (Zhou, Doyle & Glover 1996, Eq. 14.18). Small residuals (< 1e-6 relative) confirm solver accuracy.

robust_feasibility_margin

robust_feasibility_margin()

Return gamma^2 - rho(XY); positive values satisfy the strict condition.

Strict condition: rho(XY) < γ^2 — Doyle et al. 1989, §IV, Theorem 1.

reset

reset()

Reset controller state to zero.

get_radial_robust_controller

get_radial_robust_controller(gamma_growth=100.0, *, damping=10.0, enforce_robust_feasibility=False)

H-infinity controller for tokamak vertical stability.

Plant: second-order vertical instability model from Ariola & Pironti 2008, Ch. 5, linearised about an unstable equilibrium:

A = [[0,    1         ],
     [γ_v², -damping  ]]

where γ_v is the vertical growth rate. The double integrator structure captures the leading-order Shafranov instability at low plasma current.

Parameters

gamma_growth : float Vertical instability growth rate [1/s]. ITER-like: ~100/s. SPARC: ~1000/s (Ariola & Pironti 2008, Ch. 5). damping : float Passive damping coefficient [1/s]. Resistive-wall contribution; 10.0 is representative for ITER-like wall proximity (Ariola & Pironti 2008). enforce_robust_feasibility : bool, optional If True, require rho(XY) < gamma^2 and raise on infeasible synthesis.

Returns

HInfinityController Riccati-synthesized robust controller.

Model Predictive Control

NeuralSurrogate

NeuralSurrogate(n_coils, n_state, verbose=True)

Linearized surrogate model around current operating point.

ModelPredictiveController

ModelPredictiveController(surrogate, target_state, *, prediction_horizon=PREDICTION_HORIZON, learning_rate=0.5, iterations=20, action_limit=2.0, action_regularization=0.1)

Gradient-based MPC planner over surrogate dynamics.

Optimal Control

OptimalController

OptimalController(config_file, *, kernel_factory=FusionKernel, verbose=True, correction_limit=5.0, coil_current_limits=(-40.0, 40.0), current_target_limits=(5.0, 16.0))

MIMO controller using response-matrix inversion with bounded actuation.

identify_system

identify_system(perturbation=0.5)

Perturb each coil and measure plasma-axis response to build Jacobian.

get_plasma_pos

get_plasma_pos()

Return current plasma-axis position [R, Z].

compute_optimal_correction

compute_optimal_correction(current_pos, target_pos, regularization_limit=0.01)

Solve Error = J * Delta_I using damped pseudoinverse.

Digital Twin

run_digital_twin() now supports persistent sensor calibration bias and drift in addition to dropout and white-noise corruption, and it can now stress the command path with deterministic actuator bias, drift, first-order lag, and rate limiting. The returned summary exposes both commanded and applied actions plus actuator-lag telemetry so replay tests can see what the plant actually received.

run_digital_twin

run_digital_twin(time_steps=TIME_STEPS, seed=42, save_plot=True, output_path='Tokamak_Digital_Twin.png', verbose=True, gyro_surrogate=None, chaos_monkey=False, sensor_dropout_prob=0.0, sensor_noise_std=0.0, sensor_bias_std=0.0, sensor_drift_std=0.0, sensor_thermal_lag_tau=0.05, dt=0.001, actuator_bias=0.0, actuator_drift_std=0.0, actuator_tau_steps=0.0, actuator_rate_limit=2.0, rng=None)

Run deterministic digital-twin control simulation.

Returns a summary dict so callers can use the simulation without relying on console text or plot artifacts.

Flight Simulator

IsoFluxController

IsoFluxController(config_file, kernel_factory=FusionKernel, verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05)

Simulates the Plasma Control System (PCS). Uses PID loops to adjust Coil Currents to maintain plasma shape.

run_flight_sim

run_flight_sim(config_file=None, shot_duration=SHOT_DURATION, seed=42, save_plot=True, output_path='Tokamak_Flight_Report.png', verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05, kernel_factory=FusionKernel)

Run deterministic tokamak flight-sim control loop and return summary.

Free-Boundary Tracking

Experimental closed-loop free-boundary tracking that keeps the full FusionKernel in the loop and re-identifies the local coil-response map from repeated solves. Safe-current fallback targets can be supplied through the free_boundary_tracking.fallback_currents config block when supervisor rejection should ramp the coils toward a predefined safe state. Persistent objective residuals can also be accumulated with the config-driven free_boundary_tracking.observer_gain and observer_max_abs settings. When free-boundary objective tolerances are configured, the controller also uses them directly in its correction and accept/reject logic so tighter X-point or divertor targets take precedence over looser shape goals, and it refuses trial steps that would push an already-satisfied objective back outside tolerance. If the identified coil-response map loses authority entirely, the controller marks that degraded state explicitly and drops into the safe-state recovery path instead of silently accepting a zero-action step. Residuals already inside the configured tolerances are also treated as deadband, so the controller stops chattering the coils once the protected objectives are met. Coil allocation is also headroom-aware, so the regularized solve prefers actuators that still have current authority instead of leaning equally on a nearly saturated coil. Deterministic objective-space sensor bias and per-step drift can be injected through free_boundary_tracking.measurement_bias and measurement_drift_per_step, and known calibration corrections can be applied with measurement_correction_bias and measurement_correction_drift_per_step. The run summary reports both measured and hidden true objective errors so calibration faults cannot masquerade as control success in acceptance tests.

from scpn_control.control.free_boundary_tracking import run_free_boundary_tracking

summary = run_free_boundary_tracking(
    "iter_config.json",
    shot_steps=5,
    gain=0.8,
    verbose=False,
    coil_slew_limits=2.5e5,
    supervisor_limits={"x_point_position": 0.15, "max_abs_actuator_lag": 1.0e5},
    hold_steps_after_reject=2,
)

print(summary["shape_rms"], summary["objective_converged"], summary["supervisor_intervention_count"])

FreeBoundaryTrackingController

FreeBoundaryTrackingController(config_file, *, kernel_factory=FusionKernel, verbose=True, identification_perturbation=0.25, correction_limit=2.0, response_regularization=0.001, response_refresh_steps=1, solve_max_outer_iter=10, solve_tol=0.0001, objective_tolerances=None, control_dt_s=None, coil_actuator_tau_s=None, coil_slew_limits=None, supervisor_limits=None, hold_steps_after_reject=None, state_estimator=None)

Direct free-boundary controller using local coil-response identification.

This path keeps the full Grad-Shafranov kernel in the loop instead of replacing it with a reduced-order plant.

Examples

from scpn_control.control.free_boundary_tracking import run_free_boundary_tracking summary = run_free_boundary_tracking( ... "iter_config.json", ... shot_steps=3, ... gain=0.8, ... verbose=False, ... ) summary["boundary_variant"] 'free_boundary'

run_free_boundary_tracking

run_free_boundary_tracking(config_file=None, *, shot_steps=10, gain=1.0, verbose=True, kernel_factory=FusionKernel, objective_tolerances=None, control_dt_s=None, coil_actuator_tau_s=None, coil_slew_limits=None, supervisor_limits=None, hold_steps_after_reject=None, disturbance_callback=None, stop_on_convergence=False)

Run deterministic free-boundary tracking over the configured objectives.

Examples

summary = run_free_boundary_tracking("iter_config.json", shot_steps=2, gain=0.7, verbose=False) bool(summary["objective_convergence_active"]) True

Disruption Predictor

predict_disruption_risk_safe() still returns a bounded scalar risk, but its metadata now includes deterministic sigma-point uncertainty summaries (risk_p05, risk_p50, risk_p95, risk_std, risk_interval) for both fallback and checkpoint inference paths.

DisruptionTransformer

DisruptionTransformer()

predict_disruption_risk

predict_disruption_risk(signal, toroidal_observables=None)

Lightweight deterministic disruption risk estimator returning a value in [0, 1].

Supplements the Transformer pathway by explicitly consuming toroidal asymmetry observables from 3D diagnostics (n=1,2,3 mode amplitudes).

Feature weights tuned on synthetic DIII-D/JET validation shots; see validation/reports/disruption_replay_pipeline_benchmark.md. Logit bias: sigmoid(−4.0) ≈ 0.018, giving low base risk on zero features.

predict_disruption_risk_safe

predict_disruption_risk_safe(signal, toroidal_observables=None, *, model_path=None, seq_len=DEFAULT_SEQ_LEN, train_if_missing=False, mc_samples=10)

Predict disruption risk with MC dropout uncertainty if model is available.

Returns

risk, metadata risk is the MC mean risk in [0, 1]. metadata includes risk_std (epistemic uncertainty).

Disruption Contracts

run_disruption_episode

run_disruption_episode(*, rng, rl_agent, base_tbr, explorer)

predict_disruption_risk

predict_disruption_risk(signal, toroidal_observables=None)

Lightweight deterministic disruption risk estimator returning a value in [0, 1].

Supplements the Transformer pathway by explicitly consuming toroidal asymmetry observables from 3D diagnostics (n=1,2,3 mode amplitudes).

Feature weights tuned on synthetic DIII-D/JET validation shots; see validation/reports/disruption_replay_pipeline_benchmark.md. Logit bias: sigmoid(−4.0) ≈ 0.018, giving low base risk on zero features.

SPI Mitigation

ShatteredPelletInjection

ShatteredPelletInjection(Plasma_Energy_MJ=300.0, Plasma_Current_MA=15.0)

Reduced SPI mitigation model for thermal/current quench campaigns.

SPI concept: Commaux et al. 2010, Nucl. Fusion 50, 112001.

estimate_z_eff_cocktail staticmethod

estimate_z_eff_cocktail(*, neon_quantity_mol=0.0, argon_quantity_mol=0.0, xenon_quantity_mol=0.0)

Estimate Z_eff for a noble-gas SPI cocktail.

Radiative efficiency weighting reflects that higher-Z species dominate coronal radiation. Lehnen et al. 2015, J. Nucl. Mater. 463, 39.

estimate_tau_cq staticmethod

estimate_tau_cq(te_keV, z_eff)

Current quench time scale.

Scaling: τ_CQ ∝ T_e^0.25 / Z_eff, consistent with resistive decay. τ_TQ ~ 1–3 ms, τ_CQ ~ 20–150 ms for ITER. Hender et al. 2007, Nucl. Fusion 47, S128, Sec. 3.

run_spi_mitigation

run_spi_mitigation(*, plasma_energy_mj=300.0, plasma_current_ma=15.0, neon_quantity_mol=0.1, argon_quantity_mol=0.0, xenon_quantity_mol=0.0, duration_s=0.05, dt_s=1e-05, save_plot=True, output_path='SPI_Mitigation_Result.png', verbose=True)

Run SPI mitigation simulation and return deterministic summary metrics.

Fusion Control Room

run_control_room

run_control_room(sim_duration=SIM_DURATION, *, seed=42, save_animation=True, save_report=True, output_gif='SCPN_Fusion_Control_Room.gif', output_report='SCPN_Fusion_Status_Report.png', verbose=True, kernel_factory=None, config_file=None)

Run the control-room loop and return deterministic summary metrics.

TokamakPhysicsEngine

TokamakPhysicsEngine(size=RESOLUTION, *, seed=42, kernel=None)

Reduced Grad-Shafranov geometry model with optional kernel-Psi ingestion.

solve_flux_surfaces

solve_flux_surfaces()

Return (density, psi) from kernel state when available, otherwise from analytic Miller-parameterized geometry.

step_dynamics

step_dynamics(coil_action_top, coil_action_bottom)

Reduced vertical-displacement dynamics.

Gymnasium Environment

TokamakEnv

TokamakEnv(dt=0.001, max_steps=500, T_target=20.0, noise_std=0.01, seed=42, n_e_20=1.0, V_plasma=830.0)

Minimal Gymnasium-compatible tokamak control environment.

Implements a simplified plasma response model for energy and current evolution based on 0D lumped-parameter equations. Ref: Wesson, J. (2011). Tokamaks. 4th Edition, Chapter 1.

Follows the gymnasium.Env interface (reset/step/render) without requiring gymnasium as a hard dependency. If gymnasium is installed, this class can be registered via gymnasium.register().

Parameters

dt : float Timestep per step call [s]. max_steps : int Episode length. T_target : float Target axis temperature [keV]. noise_std : float Observation noise standard deviation. n_e_20 : float Line-averaged electron density [10^20 m^-3]. Default 1.0. V_plasma : float Plasma volume [m^3]. Default 830.0 (ITER).

reset

reset(seed=None)

Reset to initial plasma state.

step

step(action)

Advance one timestep using physics-based energy balance.

Returns (obs, reward, terminated, truncated, info).

render

render()

Print current state.

Analytic Solver

AnalyticEquilibriumSolver

AnalyticEquilibriumSolver(config_path, *, kernel_factory=FusionKernel, verbose=True)

Analytic vertical-field target and least-norm coil-current solve.

calculate_required_Bv

calculate_required_Bv(R_geo, a_min, Ip_MA, *, beta_p=0.5, li=0.8)

Shafranov radial-force balance vertical field estimate.

compute_coil_efficiencies

compute_coil_efficiencies(target_R, *, target_Z=0.0)

Compute dBz/dI per coil at target location using kernel vacuum-field map.

solve_coil_currents

solve_coil_currents(target_Bv, target_R, *, target_Z=0.0, ridge_lambda=0.0)

Solve least-norm coil currents for desired vertical field target.

Bio-Holonomic Controller

BioHolonomicController

BioHolonomicController(dt_s=0.01, seed=42)

Biological Feedback Controller mapping L4/L5 states to clinical actions.

Rather than mitigating plasma disruptions, this controller mitigates biological decoherence by tracking autonomic tone and triggering resonant acoustic interventions.

step

step(telemetry)

Advances the bio-controller one tick using incoming telemetry.

Digital Twin Ingest

RealtimeTwinHook

RealtimeTwinHook(machine, *, max_buffer=512, seed=42)

In-memory realtime ingest + SNN planning hook.

Director Interface

DirectorInterface

DirectorInterface(config_path, *, allow_fallback=True, director=None, controller_factory=NeuroCyberneticController, entropy_threshold=0.3, history_window=10)

Interfaces the 'Director' (Layer 16: Coherence Oversight) with the Fusion Reactor.

Role: The Director does NOT control the coils (Layer 2 does that). The Director controls the Controller. It sets the strategy and monitors for "Backfire".

Mechanism: 1. Sample System State (Physics + Neural Activity). 2. Format as a "Prompt" for the Director. 3. Director calculates Entropy/Risk. 4. If Safe: Director updates Target Parameters. 5. If Unsafe: Director triggers corrective action.

format_state_for_director

format_state_for_director(t, ip, err_r, err_z, brain_activity)

Translate physical telemetry into a semantic prompt for the Director.

Fueling Mode Controller

IcePelletFuelingController

IcePelletFuelingController(target_density=1.0)

Hybrid Petri-to-SNN + PI fueling controller.

Halo RE Physics

HaloCurrentModel

HaloCurrentModel(plasma_current_ma=15.0, minor_radius_m=2.0, major_radius_m=6.2, wall_resistivity_ohm_m=7e-07, wall_thickness_m=0.06, tpf=2.0, contact_fraction=0.3)

Fitzpatrick-style L/R circuit halo current model.

Parameters

plasma_current_ma : float Pre-disruption plasma current (MA). minor_radius_m : float Plasma minor radius (m). major_radius_m : float Plasma major radius (m). wall_resistivity_ohm_m : float Wall resistivity (Ohm·m). Default: stainless steel ~7e-7. wall_thickness_m : float Wall thickness (m). tpf : float Toroidal peaking factor (1.0 = uniform, up to ~2.5 in severe VDEs). contact_fraction : float Fraction of plasma cross-section in wall contact (0–1).

simulate

simulate(tau_cq_s=0.01, duration_s=0.05, dt_s=1e-05)

Run the L/R circuit halo current model.

Parameters

tau_cq_s : float Current quench time constant (s). duration_s : float Simulation duration (s). dt_s : float Time step (s).

HIL Test Harness

HILControlLoop

HILControlLoop(target_rate_hz=1000.0, sensor=None)

Real-time 1 kHz control loop with measured timing.

Executes a user-supplied control callback at a target rate (default 1 kHz) and measures actual loop timing using time.perf_counter_ns.

Parameters

target_rate_hz : float Target control loop rate (Hz). Default: 1000 (1 kHz). sensor : SensorInterface or None Sensor/actuator interface. Created with defaults if None.

set_controller

set_controller(fn)

Register the control callback: fn(error, sensor) -> command.

run

run(iterations=1000, plant_fn=None, initial_state=0.0, setpoint=0.0)

Execute the control loop and measure timing.

Parameters

iterations : int Number of control loop iterations. plant_fn : callable or None Plant model: state_new = plant_fn(state, command). Default: simple integrator (state += command * dt). initial_state : float Initial plant state. setpoint : float Control target.

JAX Traceable Runtime

Requires pip install "scpn-control[jax]".

TraceableRuntimeSpec dataclass

TraceableRuntimeSpec(dt_s=0.001, tau_s=0.005, gain=1.0, command_limit=1.0)

Configuration for reduced traceable first-order actuator dynamics.

LIF+NEF SNN Controller

NengoSNNController

NengoSNNController(config=None)

SNN controller for tokamak position control.

Pure-NumPy LIF + NEF implementation. Per channel: error ensemble decodes gain·x into control ensemble, which decodes identity to output.

Parameters

config : NengoSNNConfig or None Controller configuration. Uses defaults if None.

build_network

build_network()

Build LIF ensembles and compute NEF decoders.

step

step(state)

Run one control step.

Parameters

state : array of shape (n_channels,) Error vector.

Returns

array of shape (n_channels,) Control command.

reset

reset()

Reset all neuron state and filters.

get_spike_data

get_spike_data()

Return probe data from simulation.

export_weights

export_weights()

Extract decoder, encoder, and gain/bias arrays.

Returns

dict mapping label -> weight array

export_fpga_weights

export_fpga_weights(filename)

Export weight matrices for FPGA synthesis as .npz.

export_loihi

export_loihi(filename)

Loihi export requires the original nengo + nengo_loihi packages.

benchmark

benchmark(n_steps=1000)

Measure per-step latency.

Returns

dict with mean_us, p50_us, p95_us, p99_us, max_us

Neuro-Cybernetic Controller

NeuroCyberneticController

NeuroCyberneticController(config_file, seed=42, *, shot_duration=SHOT_DURATION, kernel_factory=None)

Replaces PID loops with push-pull spiking populations.

TORAX Hybrid Loop

run_nstxu_torax_hybrid_campaign

run_nstxu_torax_hybrid_campaign(*, seed=42, episodes=16, steps_per_episode=220)

Run deterministic NSTX-U-like realtime hybrid control campaign.

Advanced SOC Learning

run_advanced_learning_sim

run_advanced_learning_sim(size=L, time_steps=TIME_STEPS, seed=42, *, epsilon=EPSILON, noise_probability=0.01, shear_step=0.05, shear_bounds=(0.0, 1.0), save_plot=True, output_path='Advanced_SOC_Learning.png', verbose=True)

Run deterministic SOC+Q-learning control simulation and return summary metrics.

NMPC Controller (v0.16.0)

NonlinearMPC

NonlinearMPC(plant_model, config)

SQP-based NMPC with projected gradient descent inner solver.

Each SQP outer iteration linearizes f around the nominal trajectory, then solves the resulting QP via projected gradient descent (PGD) on the condensed formulation (states eliminated).

compute_cost

compute_cost(x_traj, u_traj, x_ref)

Evaluate the NMPC cost J over a trajectory.

J = Σ_{k=0}^{N-1} ‖x_k − x_ref‖²_Q + ‖u_k‖²_R Rawlings, Mayne & Diehl 2017, Ch. 1, Eq. (1.2).

step

step(x, x_ref, u_prev)

Compute optimal first control action via SQP.

Warm-started from the previous solution shifted by one step.

Mu-Synthesis (v0.16.0)

MuSynthesisController

MuSynthesisController(plant_ss, uncertainty)

Structured robust controller synthesised by D-K iteration.

Theory: Doyle 1982 (μ definition); Balas et al. 1993 (DK algorithm); Skogestad & Postlethwaite 2005, §8.5 (convergence and practical use).

Physical uncertainty model for tokamak control follows Ariola & Pironti 2008, Ch. 7: - plasma_position real_scalar ±2 cm - plasma_current real_scalar ±3 % - plasma_shape full ±5 %

synthesize

synthesize(n_dk_iter=5)

Run D-K iteration and store the resulting controller.

step

step(x, dt)

Apply synthesised controller: u = -K x.

robustness_margin

robustness_margin()

Return 1/μ_peak — the structured stability margin.

μ_peak < 1 means the system is robustly stable for all Δ with ||Δ|| ≤ 1/μ_peak (Doyle 1982; Skogestad & Postlethwaite 2005, §8.2).

compute_mu_upper_bound

compute_mu_upper_bound(M, delta_structure)

D-scaling upper bound on μ(M).

Computes min_D σ̄(D M D^{-1}) where D is block-diagonal with positive real scalars matching delta_structure. This is always ≥ μ(M) and equals μ(M) for complex full blocks (Doyle 1982, IEE Proc. D 129, 242).

A subgradient descent on log(D) is used; for production use replace with an LMI solver or the MATLAB μ-toolbox (Balas et al. 1993, Ch. 8).

Parameters

M : np.ndarray, shape (n, n) Closed-loop transfer matrix evaluated at a single frequency. delta_structure : list of (size, block_type) Block sizes and types from StructuredUncertainty.build_Delta_structure().

Returns

float Upper bound μ̄ ≥ μ(M).

Real-Time EFIT (v0.16.0)

RealtimeEFIT

RealtimeEFIT(diagnostics, R_grid, Z_grid, n_p_modes=3, n_ff_modes=3)

Simplified real-time equilibrium reconstruction (EFIT).

reconstruct

reconstruct(measurements)

Main EFIT loop.

find_xpoint

find_xpoint(psi)

Locate magnetic nulls (dpsi/dR = 0, dpsi/dZ = 0).

Gain-Scheduled Controller (v0.16.0)

GainScheduledController

GainScheduledController(controllers)

Multi-regime PID controller with bumpless gain interpolation.

Scheduling approach: Rugh & Shamma 2000, Automatica 36, 1401, §3. Bumpless transfer via linear interpolation over _TAU_SWITCH seconds avoids step transients at regime boundaries (Walker et al. 2006, §3.2).

LPV interpretation: gains are piecewise-affine in the scheduling vector (I_p, β_N, l_i) per Packard 1994, Systems & Control Letters 22, 79.

step

step(x, t, dt, detected_regime)

Compute PID output with bumpless gain interpolation.

On regime switch: α = (t - t_switch) / τ_switch ∈ [0,1]. Gains interpolated linearly: K(α) = (1-α) K_old + α K_new. Walker et al. 2006, §3.2, Eq. (4).

Shape Controller (v0.16.0)

PlasmaShapeController

PlasmaShapeController(target, coil_set, kernel)

Real-time shape controller using Tikhonov-regularized pseudoinverse.

Gain matrix

K = (JᵀWJ + λI)⁻¹ JᵀW

Ferron et al. 1998, Nucl. Fusion 38, 1055: ISOFLUX control law maps ψ differences at boundary points to coil current corrections.

step

step(psi, coil_currents)

Compute coil current corrections to reduce shape errors.

Safe RL Controller (v0.16.0)

LagrangianPPO

LagrangianPPO(env, lambda_lr=0.01, gamma=0.99)

PPO augmented with Lagrangian multipliers for constraint satisfaction.

Objective (Tessler et al. 2018, Algorithm 1): r_aug = r - Σ_i λ_i c_i

Multiplier update (dual gradient ascent): λ_i ← max(0, λ_i + lr (C_i - d_i))

Safety guarantee: at convergence J_{C_i}(π*) ≤ d_i for all i (Achiam et al. 2017, Theorem 1, assuming feasibility).

Degrave et al. 2022, Nature 602, 414: penalty-based safety shaping used in DeepMind tokamak controller for disruption avoidance.

update_lambdas

update_lambdas(episode_costs)

Dual gradient ascent step.

λ_i ← max(0, λ_i + lr (C_i - d_i)) Achiam et al. 2017, §5, Lagrangian update.

train

train(total_timesteps)

Stub training loop demonstrating λ update mechanics.

A production implementation replaces the random-action rollout with a PPO policy gradient update on the augmented reward.

Sliding-Mode Vertical (v0.16.0)

VerticalStabilizer

VerticalStabilizer(n_index, Ip_MA, R0, m_eff, tau_wall, smc)

Vertical-position (VS) controller for an elongated tokamak.

Vertical instability growth rate for elongated plasmas

γ ≈ (κ - 1) / τ_wall

where κ is elongation, τ_wall is the resistive wall time. Lazarus et al. 1990, Nucl. Fusion 30, 111, §2.

Real-time VS implementation at DIII-D: Humphreys et al. 2009, Nucl. Fusion 49, 115003.

Restoring force on the plasma column (rigid-body model): F = -n μ₀ I_p² / (4π R₀) · Z ≡ -K_vs · Z where n is the field-index (n < 0 for unstable equilibria), μ₀ = 4π×10⁻⁷ H/m (SI), I_p in A, R₀ in m. (Wesson 2004, "Tokamaks", Oxford, §3.7)

K_vs property

K_vs

Vertical restoring force coefficient [N/m].

K_vs = -n μ₀ I_p² / (4π R₀) Wesson 2004, §3.7, Eq. (3.7.4). Negative n_index (n < 0) makes K_vs > 0 (unstable).

step

step(Z_meas, Z_ref, dZ_dt_meas, dt)

Compute coil correction voltage.

Plant: m_eff Z̈ = -K_vs Z + F_coil (linearised, Humphreys 2009). e = Z_meas - Z_ref; SMC drives e → 0.

Scenario Scheduler (v0.16.0)

ScenarioOptimizer

ScenarioOptimizer(plant_model, target_state, T_total, dt=0.5)

Offline trajectory design.

optimize

optimize(n_iter=100)

Gradient-free optimization of breakpoint values.

Fault-Tolerant Control (v0.16.0)

ReconfigurableController

ReconfigurableController(base_controller, jacobian, n_coils, n_sensors)

Control-allocation reconfiguration after actuator or sensor faults.

Weighted pseudo-inverse gain: u = (J^T W J + λI)^{-1} J^T W v. Reference: Bodson 2002, J. Guidance 25, 307, Eq. 12.

Faulted coil columns are zeroed in J before the gain is recomputed, matching the reconfiguration procedure in Ambrosino et al. 2008, Fusion Eng. Des. 83, 1485 for the ITER VS system.

step

step(error, dt)

Apply reconfigured gain; compensate for stuck-coil offsets.

controllability_check

controllability_check()

True if remaining actuators span the minimum required target space.

MIN_REQUIRED_RANK = 2 covers control of Ip and vertical position, the two safety-critical outputs identified by Ambrosino et al. 2008.

graceful_shutdown

graceful_shutdown()

Return zero ramp-down command for all coils.

RZIp Model (v0.16.0)

RZIPModel

RZIPModel(R0, a, kappa, Ip_MA, B0, n_index, vessel, active_coils=None)

RWM Feedback (v0.16.0)

RWMFeedbackController

RWMFeedbackController(n_sensors, n_coils, G_p, G_d, tau_controller=0.0001, M_coil=1.0)

Sensor-coil PD feedback for RWM stabilization.

The closed-loop effective growth rate combines wall physics (including rotation) with proportional feedback suppression:

γ_eff = γ_total − G_p M_coil γ_total / (1 + γ_total τ_ctrl)

Garofalo et al. 2002, Phys. Plasmas 9, 1997, Sec. III.

step

step(B_r_sensors, dt)

Compute feedback coil currents from sensor measurements.

I = G_p B_r + G_d dB_r/dt

effective_growth_rate

effective_growth_rate(rwm)

Closed-loop growth rate.

γ_eff = γ_total − G_p M_coil γ_total / (1 + γ_total τ_ctrl)

Garofalo et al. 2002, Phys. Plasmas 9, 1997, Eq. (5). Uses γ_total from RWMPhysics (includes rotation if Ω_φ ≠ 0).

is_stabilized

is_stabilized(rwm)

True when the closed-loop growth rate is negative.


CLI

scpn-control demo --scenario combined --steps 1000
scpn-control benchmark --n-bench 5000 --json-out
scpn-control validate --json-out
scpn-control info --json-out
scpn-control live --port 8765 --zeta 0.5 --layers 16
scpn-control hil-test --shots-dir path/to/shots
Command Description
demo Closed-loop control demonstration (PID, SNN, combined)
benchmark PID vs SNN timing benchmark with JSON output option
validate RMSE validation against reference shots
info Version, Rust backend status, weight provenance, Python/NumPy versions
live Real-time WebSocket phase sync server
hil-test Hardware-in-the-loop test campaign against shot data

Rust Acceleration

When scpn-control-rs is built via maturin, all core solvers use Rust backends automatically:

from scpn_control import RUST_BACKEND
print(RUST_BACKEND)  # True if Rust available

# Transparent acceleration — same Python API, Rust execution
kernel = FusionKernel(R0=6.2, a=2.0, B0=5.3)

Build Rust bindings:

cd scpn-control-rs/crates/control-python
maturin develop --release

PyO3 Bindings

Python Class Rust Binding Crate
FusionKernel PyFusionKernel control-core
RealtimeMonitor PyRealtimeMonitor control-math
SnnPool PySnnPool control-control
MpcController PyMpcController control-control
Plasma2D PyPlasma2D control-core
TransportSolver PyTransportSolver control-core