Skip to content

API Reference

Top-Level Exports

import scpn_control

scpn_control.__version__       # "0.7.0"
scpn_control.FusionKernel      # Grad-Shafranov equilibrium solver
scpn_control.RUST_BACKEND      # True if Rust acceleration available
scpn_control.TokamakConfig     # Preset tokamak geometries
scpn_control.StochasticPetriNet
scpn_control.FusionCompiler
scpn_control.CompiledNet
scpn_control.NeuroSymbolicController
scpn_control.kuramoto_sakaguchi_step
scpn_control.order_parameter
scpn_control.KnmSpec
scpn_control.build_knm_paper27
scpn_control.UPDESystem
scpn_control.LyapunovGuard
scpn_control.RealtimeMonitor

Core — Physics Solvers

FusionKernel

FusionKernel

FusionKernel(config_path)

Non-linear free-boundary Grad-Shafranov equilibrium solver.

Parameters

config_path : str | Path Path to a JSON configuration file describing the reactor geometry, coil set, physics parameters and solver settings.

Attributes

Psi : FloatArray Poloidal flux on the (Z, R) grid. J_phi : FloatArray Toroidal current density on the (Z, R) grid. B_R, B_Z : FloatArray Radial and vertical magnetic field components (set after solve).

load_config

load_config(path)

Load reactor configuration from a JSON file.

Parameters

path : str | Path Filesystem path to the configuration JSON.

initialize_grid

initialize_grid()

Build the computational (R, Z) mesh from the loaded config.

setup_accelerator

setup_accelerator()

Initialise the optional C++ HPC acceleration bridge.

calculate_vacuum_field

calculate_vacuum_field()

Compute the vacuum poloidal flux from the external coil set.

Uses elliptic integrals (toroidal Green's function) for each coil.

Returns

FloatArray Vacuum flux Psi_vac on the (NZ, NR) grid.

find_x_point

find_x_point(Psi)

Locate the X-point (magnetic null) in the lower divertor region.

Parameters

Psi : FloatArray Poloidal flux array on the (NZ, NR) grid.

Returns

tuple[tuple[float, float], float] ((R_x, Z_x), Psi_x) — position and flux value at the X-point.

mtanh_profile staticmethod

mtanh_profile(psi_norm, params)

Evaluate a modified-tanh pedestal profile (vectorised).

Parameters

psi_norm : FloatArray Normalised poloidal flux (0 at axis, 1 at separatrix). params : dict[str, float] Profile shape parameters with keys ped_top, ped_width, ped_height, core_alpha.

Returns

FloatArray Profile value; zero outside the plasma region.

update_plasma_source_nonlinear

update_plasma_source_nonlinear(Psi_axis, Psi_boundary)

Compute the toroidal current density J_phi from the GS source.

Uses J_phi = R p'(psi) + FF'(psi) / (mu0 R) with either L-mode (linear) or H-mode (mtanh) profiles, then renormalises to match the target plasma current.

Parameters

Psi_axis : float Poloidal flux at the magnetic axis (O-point). Psi_boundary : float Poloidal flux at the separatrix (X-point or limiter).

Returns

FloatArray Updated J_phi on the (NZ, NR) grid.

solve_equilibrium

solve_equilibrium(preserve_initial_state=False, boundary_flux=None)

Run the full Picard-iteration equilibrium solver.

Iterates: topology analysis -> source update -> elliptic solve -> under-relaxation until the residual drops below the configured convergence threshold or the maximum iteration count is reached.

When solver.solver_method is "anderson", Anderson acceleration is applied every few Picard steps to speed up convergence.

Returns

dict[str, Any] {"psi": FloatArray, "converged": bool, "iterations": int, "residual": float, "residual_history": list[float], "gs_residual": float, "gs_residual_best": float, "gs_residual_history": list[float], "wall_time_s": float, "solver_method": str}

Parameters

preserve_initial_state : bool, optional Keep current interior self.Psi values and only enforce boundary conditions before the iterative solve. Default is False. boundary_flux : FloatArray | None, optional Explicit boundary map to enforce during solve. Must have shape (NZ, NR) when provided.

Raises

RuntimeError If the solver produces NaN or Inf and solver.fail_on_diverge is enabled in the active configuration.

compute_b_field

compute_b_field()

Derive the magnetic field components from the solved Psi.

optimize_coil_currents

optimize_coil_currents(coils, target_flux, tikhonov_alpha=0.0001)

Find coil currents that best reproduce target_flux at the control points.

Solves the bounded linear least-squares problem:

min_I || M^T I - target_flux ||^2 + alpha * ||I||^2
s.t.  -I_max <= I <= I_max  (per coil)

where M is the mutual-inductance matrix.

Parameters

coils : CoilSet Must have target_flux_points set (shape (n_pts, 2)). target_flux : FloatArray, shape (n_pts,) Desired poloidal flux at each control point. tikhonov_alpha : float Regularisation strength to penalise large currents.

Returns

FloatArray, shape (n_coils,) Optimised coil currents [A].

solve_free_boundary

solve_free_boundary(coils, max_outer_iter=20, tol=0.0001, optimize_shape=False, tikhonov_alpha=0.0001)

Free-boundary GS solve with external coil currents.

Iterates between updating boundary flux from coils and solving the internal GS equation. When optimize_shape=True and the coil set has target_flux_points, an additional outer loop optimises the coil currents to match the desired plasma boundary shape.

Parameters

coils : CoilSet External coil set. max_outer_iter : int Maximum free-boundary iterations. tol : float Convergence tolerance on max |delta psi|. optimize_shape : bool When True, run coil-current optimisation at each outer step. tikhonov_alpha : float Tikhonov regularisation for coil optimisation.

Returns

dict {"outer_iterations": int, "final_diff": float, "coil_currents": NDArray}

phase_sync_step

phase_sync_step(theta, omega, *, dt=0.001, K=None, alpha=None, zeta=None, psi_driver=None, psi_mode=None, actuation_gain=None)

Reduced-order plasma sync kernel (phase reduction).

dθ_i/dt = ω_i + K·R·sin(ψ_r − θ_i − α) + ζ·sin(Ψ − θ_i)

Ψ is exogenous when psi_mode="external" (no dotΨ equation). This is the reviewer's ζ sin(Ψ−θ) injection for plasma sync stability.

phase_sync_step_lyapunov

phase_sync_step_lyapunov(theta, omega, *, n_steps=100, dt=0.001, K=None, zeta=None, psi_driver=None, psi_mode=None)

Multi-step phase sync with Lyapunov stability tracking.

Returns final state, R trajectory, V trajectory, and λ exponent. λ < 0 ⟹ stable convergence toward Ψ.

save_results

save_results(filename='equilibrium_nonlinear.npz')

Save the equilibrium state to a compressed NumPy archive.

Parameters

filename : str Output file path.

TokamakConfig

TokamakConfig dataclass

TokamakConfig(name, R0, a, B0, Ip, kappa, delta, n_e, T_e, P_aux)

Tokamak equilibrium operating point.

Units: lengths [m], field [T], current [MA], density [1e19 m^-3], temperature [keV], power [MW].

iter classmethod

iter()

ITER baseline H-mode. ITER Research Plan (2018).

sparc classmethod

sparc()

SPARC V2C. Creely et al., J. Plasma Phys. 86, 865860502 (2020).

diiid classmethod

diiid()

DIII-D typical H-mode. Luxon, Nucl. Fusion 42, 614 (2002).

jet classmethod

jet()

JET with ITER-like wall. Joffrin et al., Nucl. Fusion 59 (2019).

TransportSolver

TransportSolver

TransportSolver(config_path, *, multi_ion=False)

Bases: FusionKernel

1.5D Integrated Transport Code. Solves Heat and Particle diffusion equations on flux surfaces, coupled self-consistently with the 2D Grad-Shafranov equilibrium.

When multi_ion=True, the solver evolves separate D/T fuel densities, He-ash transport with pumping (configurable tau_He), independent electron temperature Te, coronal-equilibrium tungsten radiation (Pütterich et al. 2010), and per-cell Bremsstrahlung.

set_neoclassical

set_neoclassical(R0, a, B0, A_ion=2.0, Z_eff=1.5, q0=1.0, q_edge=3.0)

Configure Chang-Hinton neoclassical transport model.

When set, update_transport_model uses the Chang-Hinton formula instead of the constant chi_base = 0.5.

chang_hinton_chi_profile

chang_hinton_chi_profile()

Backward-compatible Chang-Hinton profile helper.

Older parity tests call this no-arg method on a partially-initialized transport object. Keep the method as a thin adapter over the module function so those tests remain stable.

inject_impurities

inject_impurities(flux_from_wall_per_sec, dt)

Models impurity influx from PWI erosion. Simple diffusion model: Source at edge, diffuses inward.

calculate_bootstrap_current_simple

calculate_bootstrap_current_simple(R0, B_pol)

Calculates the neoclassical bootstrap current density [A/m2] using a simplified Sauter model. J_bs = - (R/B_pol) * [ L31 * (dP/dpsi) + ... ]

calculate_bootstrap_current

calculate_bootstrap_current(R0, B_pol)

Calculate bootstrap current. Uses full Sauter if neoclassical params set.

update_transport_model

update_transport_model(P_aux)

Gyro-Bohm + neoclassical transport model with EPED-like pedestal.

When neoclassical params are set, uses: - Chang-Hinton neoclassical chi as additive floor - Gyro-Bohm anomalous transport (calibrated c_gB) - EPED-like pedestal model for H-mode boundary condition

Falls back to constant chi_base=0.5 when neoclassical is not configured.

evolve_profiles

evolve_profiles(dt, P_aux, enforce_conservation=False)

Advance Ti by one time step using Crank-Nicolson implicit diffusion.

The scheme is unconditionally stable, allowing dt up to ~1.0 s without NaN. The full equation solved is:

(T^{n+1} - T^n)/dt = 0.5*[L_h(T^{n+1}) + L_h(T^n)] + S - Sink
Parameters

dt : float Time step [s]. P_aux : float Auxiliary heating power [MW]. enforce_conservation : bool When True, raise :class:PhysicsError if the per-step energy conservation error exceeds 1%.

map_profiles_to_2d

map_profiles_to_2d()

Projects the 1D radial profiles back onto the 2D Grad-Shafranov grid, including neoclassical bootstrap current.

compute_confinement_time

compute_confinement_time(P_loss_MW)

Compute the energy confinement time from stored energy.

τ_E = W_stored / P_loss, where W_stored = ∫ 3/2 n (Ti+Te) dV and the volume element is estimated from the 1D radial profiles using cylindrical approximation.

Parameters

P_loss_MW : float Total loss power [MW]. Must be > 0.

Returns

float Energy confinement time [s].

run_self_consistent

run_self_consistent(P_aux, n_inner=100, n_outer=10, dt=0.01, psi_tol=0.001)

Run self-consistent GS <-> transport iteration.

This implements the standard integrated-modelling loop used by codes such as ASTRA and JINTRAC: evolve the 1D transport for n_inner steps, project profiles onto the 2D grid, re-solve the Grad-Shafranov equilibrium, and repeat until the poloidal-flux change drops below psi_tol.

Algorithm
  1. Run transport for n_inner steps (evolve Ti/Te/ne).
  2. Call :meth:map_profiles_to_2d to update J_phi on the 2D grid.
  3. Re-solve the Grad-Shafranov equilibrium with the updated source.
  4. Check psi convergence: ||Psi_new - Psi_old|| / ||Psi_old|| < psi_tol.
  5. Repeat until converged or n_outer iterations exhausted.
Parameters

P_aux : float Auxiliary heating power [MW]. n_inner : int Number of transport evolution steps per outer iteration. n_outer : int Maximum number of outer (GS re-solve) iterations. dt : float Transport time step [s]. psi_tol : float Relative psi convergence tolerance.

Returns

dict {"T_avg": float, "T_core": float, "tau_e": float, "n_outer_converged": int, "psi_residuals": list[float], "Ti_profile": ndarray, "ne_profile": ndarray, "converged": bool}

run_to_steady_state

run_to_steady_state(P_aux, n_steps=500, dt=0.01, adaptive=False, tol=0.001, self_consistent=False, sc_n_inner=100, sc_n_outer=10, sc_psi_tol=0.001)

Run transport evolution until approximate steady state.

Parameters

P_aux : float Auxiliary heating power [MW]. n_steps : int Number of evolution steps. dt : float Time step [s] (initial value when adaptive=True). adaptive : bool Use Richardson-extrapolation adaptive time stepping. tol : float Error tolerance for adaptive stepping. self_consistent : bool When True, delegate to :meth:run_self_consistent which iterates GS <-> transport to convergence. The remaining sc_* parameters are forwarded. sc_n_inner : int Transport steps per outer GS iteration (self-consistent mode). sc_n_outer : int Maximum outer GS iterations (self-consistent mode). sc_psi_tol : float Relative psi convergence tolerance (self-consistent mode).

Returns

dict {"T_avg": float, "T_core": float, "tau_e": float, "n_steps": int, "Ti_profile": ndarray, "ne_profile": ndarray} When adaptive=True, also includes dt_final, dt_history, error_history. When self_consistent=True, returns the :meth:run_self_consistent dict instead.

Scaling Laws

ipb98y2_tau_e

ipb98y2_tau_e(Ip, BT, ne19, Ploss, R, kappa, epsilon, M=2.5, *, coefficients=None)

Evaluate the IPB98(y,2) confinement time scaling law.

Parameters

Ip : float Plasma current [MA]. BT : float Toroidal magnetic field [T]. ne19 : float Line-averaged electron density [10^19 m^-3]. Ploss : float Loss power [MW]. Must be > 0. R : float Major radius [m]. kappa : float Elongation (κ). epsilon : float Inverse aspect ratio a/R. M : float Effective ion mass [AMU]. Default 2.5 (D-T). coefficients : dict, optional Pre-loaded coefficient dict. If None, loaded from disk.

Returns

float Predicted thermal energy confinement time τ_E [s].

Raises

ValueError If any input is non-finite or non-positive.

compute_h_factor

compute_h_factor(tau_actual, tau_predicted)

Compute the H-factor (enhancement factor over scaling law).

Parameters

tau_actual : float Measured or simulated confinement time [s]. tau_predicted : float IPB98(y,2) predicted confinement time [s].

Returns

float H98(y,2) = tau_actual / tau_predicted.

GEQDSK I/O

GEqdsk dataclass

GEqdsk(description='', nw=0, nh=0, rdim=0.0, zdim=0.0, rcentr=0.0, rleft=0.0, zmid=0.0, rmaxis=0.0, zmaxis=0.0, simag=0.0, sibry=0.0, bcentr=0.0, current=0.0, fpol=(lambda: array([], dtype=float64))(), pres=(lambda: array([], dtype=float64))(), ffprime=(lambda: array([], dtype=float64))(), pprime=(lambda: array([], dtype=float64))(), qpsi=(lambda: array([], dtype=float64))(), psirz=(lambda: array([], dtype=float64))(), rbdry=(lambda: array([], dtype=float64))(), zbdry=(lambda: array([], dtype=float64))(), rlim=(lambda: array([], dtype=float64))(), zlim=(lambda: array([], dtype=float64))())

Container for all data in a G-EQDSK file.

r property

r

1-D array of R grid values.

z property

z

1-D array of Z grid values.

psi_norm property

psi_norm

Normalised poloidal flux ψ_N ∈ [0, 1] (axis=0, boundary=1).

psi_to_norm

psi_to_norm(psi)

Map raw ψ to normalised ψ_N.

to_config

to_config(name='eqdsk')

Convert to FusionKernel JSON config dict (approximate).

read_geqdsk

read_geqdsk(path)

Read a G-EQDSK file and return a :class:GEqdsk container.

Handles both whitespace-separated and fixed-width Fortran formats, including the common case where values run together without spaces (e.g. 2.385E+00-1.216E+01).

Parameters

path : str or Path Path to the G-EQDSK file.

Returns

GEqdsk Parsed equilibrium data.

write_geqdsk

write_geqdsk(eq, path)

Write a :class:GEqdsk to a G-EQDSK file.

Parameters

eq : GEqdsk Equilibrium data. path : str or Path Output file path.

Uncertainty Quantification

quantify_uncertainty

quantify_uncertainty(scenario, n_samples=10000, seed=None)

Monte Carlo uncertainty quantification for fusion performance.

Samples scaling-law coefficients from their Gaussian posteriors and propagates through the confinement and fusion power models.

Parameters

scenario : PlasmaScenario Plasma parameters (held fixed). n_samples : int Number of Monte Carlo samples (default 10,000). seed : int, optional Random seed for reproducibility.

Returns

UQResult — central estimates + error bars + percentiles.

quantify_full_chain

quantify_full_chain(scenario, n_samples=5000, seed=None, chi_gB_sigma=0.3, pedestal_sigma=0.2, boundary_sigma=0.02)

Full-chain Monte Carlo uncertainty propagation: equilibrium -> transport -> fusion power -> gain.

Extends :func:quantify_uncertainty by additionally perturbing the gyro-Bohm transport coefficient, EPED pedestal height, and equilibrium boundary shape, then computing normalised beta as an extra observable.

Parameters

scenario : PlasmaScenario Nominal plasma parameters (held at central values; perturbations are multiplicative). n_samples : int Number of Monte Carlo draws (default 5000). seed : int, optional Random seed for reproducibility. chi_gB_sigma : float Log-normal sigma for gyro-Bohm coefficient perturbation (default 0.3). pedestal_sigma : float Gaussian sigma (fractional) for pedestal height perturbation (default 0.2). boundary_sigma : float Gaussian sigma (fractional) for major-radius boundary perturbation (default 0.02, i.e. 2%).

Returns

FullChainUQResult Bands at [5%, 50%, 95%] for psi_nrmse, tau_E, P_fusion, Q, beta_N.

IMAS Adapter

EquilibriumIDS dataclass

EquilibriumIDS(r, z, psi, j_tor, ip, b0, r0, time=0.0)

Minimal subset of IMAS equilibrium IDS for scpn-control interop.

Fields match equilibrium.time_slice[0].profiles_2d[0].

from_geqdsk

from_geqdsk(filepath)

Import from GEQDSK file via scpn_control.core.eqdsk.

from_kernel

from_kernel(kernel, time=0.0)

Extract EquilibriumIDS from a solved FusionKernel instance.


SCPN — Petri Net Compiler

StochasticPetriNet

StochasticPetriNet

StochasticPetriNet()

Stochastic Petri Net with sparse matrix representation.

Usage::

net = StochasticPetriNet()
net.add_place("P_red",   initial_tokens=1.0)
net.add_place("P_green", initial_tokens=0.0)
net.add_transition("T_r2g", threshold=0.5)
net.add_arc("P_red", "T_r2g", weight=1.0)
net.add_arc("T_r2g", "P_green", weight=1.0)
net.compile()

add_place

add_place(name, initial_tokens=0.0)

Add a place (state variable) with token density in [0, 1].

add_transition

add_transition(name, threshold=0.5, delay_ticks=0)

Add a transition (logic gate) with a firing threshold.

add_arc

add_arc(source, target, weight=1.0, inhibitor=False)

Add a directed arc between a Place and a Transition (either direction).

Valid arcs

Place -> Transition (input arc, stored in W_in) Transition -> Place (output arc, stored in W_out)

Parameters

inhibitor : bool, default False If True, arc is encoded as a negative weight inhibitor input arc. Only valid for Place -> Transition edges.

Raises ValueError for same-kind connections or unknown nodes.

compile

compile(validate_topology=False, strict_validation=False, allow_inhibitor=False)

Build sparse W_in and W_out matrices from the arc list.

Parameters

validate_topology : bool, default False Compute and store topology diagnostics in :attr:last_validation_report. strict_validation : bool, default False If True, raise ValueError when topology diagnostics contain dead nodes or unseeded place cycles. allow_inhibitor : bool, default False Allow negative Place -> Transition weights for inhibitor arcs. If False, compile fails when inhibitor arcs are present.

validate_topology

validate_topology()

Return topology diagnostics without mutating compiled matrices.

Diagnostics: - dead_places: places with zero total degree. - dead_transitions: transitions with zero total degree. - unseeded_place_cycles: place-level SCC cycles with no initial token mass. - input_weight_overflow_transitions: transitions whose positive input arc-weight sum exceeds 1.0.

get_initial_marking

get_initial_marking()

Return (n_places,) float64 vector of initial token densities.

get_thresholds

get_thresholds()

Return (n_transitions,) float64 vector of firing thresholds.

get_delay_ticks

get_delay_ticks()

Return (n_transitions,) int64 vector of transition delay ticks.

summary

summary()

Human-readable summary of the net.

verify_boundedness

verify_boundedness(n_steps=500, n_trials=100)

Verify that markings stay in [0, 1] under random firing.

Runs n_trials random trajectories of n_steps each, checking that all markings remain in [0, 1] at every step.

Returns

dict {"bounded": bool, "max_marking": float, "min_marking": float, "n_trials": int, "n_steps": int}

verify_liveness

verify_liveness(n_steps=200, n_trials=1000)

Verify that all transitions fire at least once in random campaigns.

Returns

dict {"live": bool, "transition_fire_pct": dict[str, float], "min_fire_pct": float}

FusionCompiler

FusionCompiler

FusionCompiler(bitstream_length=1024, seed=42, *, lif_tau_mem=1000000.0, lif_noise_std=0.0, lif_dt=1.0, lif_resistance=1.0, lif_refractory_period=0)

Compiles a StochasticPetriNet into a CompiledNet.

Parameters

bitstream_length : Number of bits per stochastic stream (default 1024). seed : Base RNG seed for reproducibility.

with_reactor_lif_defaults classmethod

with_reactor_lif_defaults(bitstream_length=1024, seed=42, *, lif_dt=1.0, lif_resistance=1.0, lif_refractory_period=0)

Create a compiler with reactor-like LIF defaults.

Keeps the legacy constructor defaults untouched for compatibility while exposing a realistic preset for new control experiments.

traceable_runtime_kwargs staticmethod

traceable_runtime_kwargs(*, runtime_backend='auto')

Recommended controller kwargs for traceable runtime loops.

compile

compile(net, firing_mode='binary', firing_margin=0.05, *, allow_inhibitor=False, validate_topology=False, strict_topology=False)

Compile the Petri Net into sc_neurocore artifacts.

Parameters

net : compiled StochasticPetriNet. firing_mode : "binary" (default) or "fractional". firing_margin : margin for fractional firing (ignored in binary mode). allow_inhibitor : enable inhibitor arc compilation. validate_topology : run topology diagnostics during compile. strict_topology : raise if topology diagnostics detect issues.

Steps
  1. Extract dense W_in (nT x nP) and W_out (nP x nT).
  2. Create one LIF neuron per transition (pure threshold comparator).
  3. Pre-encode weight matrices as packed uint64 bitstreams.
  4. Return CompiledNet with all artifacts.

CompiledNet

CompiledNet dataclass

CompiledNet(n_places, n_transitions, place_names, transition_names, W_in, W_out, W_in_packed=None, W_out_packed=None, neurons=list(), bitstream_length=1024, thresholds=(lambda: array([], dtype=float64))(), transition_delay_ticks=(lambda: array([], dtype=int64))(), initial_marking=(lambda: array([], dtype=float64))(), seed=42, firing_mode='binary', firing_margin=0.05, lif_tau_mem=1000000.0, lif_noise_std=0.0, lif_dt=1.0, lif_resistance=1.0, lif_refractory_period=0)

Compiled Petri Net ready for sc_neurocore execution.

Holds both the dense float matrices (for validation / fallback) and pre-packed uint64 weight bitstreams (for the stochastic path).

dense_forward

dense_forward(W_packed, input_probs)

Stochastic matrix-vector product via AND + popcount.

Parameters

W_packed : (n_out, n_in, n_words) uint64 — pre-packed weight bitstreams. input_probs : (n_in,) float64 — input probabilities in [0, 1].

Returns

output : (n_out,) float64 — stochastic estimate of W @ input_probs.

dense_forward_float

dense_forward_float(W, inputs)

Float-path validation: simple W @ inputs.

lif_fire

lif_fire(currents)

Run LIF threshold detection on all transitions.

Binary mode: f_t = 1 if current >= threshold else 0 Fractional mode: f_t = clip((current - threshold) / margin, 0, 1)

Parameters

currents : (n_transitions,) float64 — weighted-sum activations.

Returns

fired : (n_transitions,) float64 vector. Binary mode → values in {0.0, 1.0}. Fractional mode → values in [0.0, 1.0].

export_artifact

export_artifact(name='controller', dt_control_s=0.001, readout_config=None, injection_config=None)

Build an Artifact from compiled state + user-provided config.

Parameters

name : artifact name. dt_control_s : control tick period (s). readout_config : dict with actions, gains, abs_max, slew_per_s lists. Required for a complete artifact. injection_config : list of place-injection dicts.

NeuroSymbolicController

NeuroSymbolicController

NeuroSymbolicController(artifact, seed_base, targets, scales, sc_n_passes=8, sc_bitflip_rate=0.0, sc_binary_margin=None, sc_antithetic=True, enable_oracle_diagnostics=True, feature_axes=None, runtime_profile='adaptive', runtime_backend='auto', rust_backend_min_problem_size=1, sc_antithetic_chunk_size=2048)

Reference controller with oracle float and stochastic paths.

Parameters

artifact : loaded .scpnctl.json artifact. seed_base : 64-bit base seed for deterministic stochastic execution. targets : control setpoint targets. scales : normalisation scales.

reset

reset()

Restore initial marking and zero previous actions.

step

step(obs, k, log_path=None)

Execute one control tick.

Steps
  1. extract_features(obs) → 4 unipolar features
  2. _inject_places(features)
  3. _oracle_step() — float path (optional)
  4. _sc_step(k) — deterministic stochastic path
  5. _decode_actions() — gain × differencing, slew + abs clamp
  6. Optional JSONL logging

step_traceable

step_traceable(obs_vector, k, log_path=None)

Execute one control tick from a fixed-order observation vector.

The vector order is self._axis_obs_keys. This avoids per-step key lookups/dict allocation in tight control loops.

Contracts

ControlObservation

Bases: TypedDict

Plant observation at a single control tick.

ControlAction

Bases: TypedDict

Actuator command output for a single control tick.

ControlTargets dataclass

ControlTargets(R_target_m=6.2, Z_target_m=0.0)

Setpoint targets for the control loop.

extract_features

extract_features(obs, targets, scales, feature_axes=None, passthrough_keys=None)

Map observation → unipolar [0, 1] feature sources.

By default returns keys x_R_pos, x_R_neg, x_Z_pos, x_Z_neg. Custom feature mappings can be supplied via feature_axes for arbitrary observation dictionaries.

decode_actions

decode_actions(marking, actions_spec, gains, abs_max, slew_per_s, dt, prev)

Decode marking → actuator commands with gain, slew-rate, and abs clamp.

Parameters

marking : current marking vector (len >= max place index). actions_spec : per-action pos/neg place definitions. gains : per-action gain multiplier. abs_max : per-action absolute saturation. slew_per_s : per-action max change rate (units/s). dt : control tick period (s). prev : previous action outputs (same length as actions_spec).

Returns

dict mapping action name → clamped value. Also mutates prev in-place.

Artifacts

Artifact dataclass

Artifact(meta, topology, weights, readout, initial_state)

Full SCPN controller artifact (.scpnctl.json).

save_artifact

save_artifact(artifact, path, compact_packed=False)

Serialize an Artifact to indented JSON.

load_artifact

load_artifact(path)

Parse a .scpnctl.json file into an Artifact dataclass.


Phase — Paper 27 Dynamics

Kuramoto-Sakaguchi Step

kuramoto_sakaguchi_step

kuramoto_sakaguchi_step(theta, omega, *, dt, K, alpha=0.0, zeta=0.0, psi_driver=None, psi_mode='external', wrap=True)

Single Euler step of mean-field Kuramoto-Sakaguchi + global driver.

dθ_i/dt = ω_i + K·R·sin(ψ_r − θ_i − α) + ζ·sin(Ψ − θ_i)

Uses Rust backend when available (rayon-parallelised, sub-ms for N>1000).

order_parameter

order_parameter(theta, weights=None)

Kuramoto order parameter R·exp(i·ψ_r) = / W.

Returns (R, ψ_r).

lyapunov_v

lyapunov_v(theta, psi)

Lyapunov candidate V(t) = (1/N) Σ (1 − cos(θ_i − Ψ)).

V=0 at perfect sync (all θ_i = Ψ), V→2 at maximal desync. Mirror of control-math/kuramoto.rs::lyapunov_v.

lyapunov_exponent

lyapunov_exponent(v_hist, dt)

λ = (1/T) · ln(V_final / V_initial). λ < 0 ⟹ stable.

wrap_phase

wrap_phase(x)

Map phases to (-π, π].

GlobalPsiDriver dataclass

GlobalPsiDriver(mode='external')

Resolve the global field phase Ψ.

mode="external" : Ψ supplied by caller (intention/carrier, no dotΨ). mode="mean_field" : Ψ = arg() from the oscillator population.

Knm Coupling Matrix

KnmSpec dataclass

KnmSpec(K, alpha=None, zeta=None, layer_names=None)

Paper 27 coupling specification.

K : (L, L) coupling matrix. K[n, m] = source n -> target m. alpha : (L, L) Sakaguchi phase-lag (optional). zeta : (L,) per-layer global-driver gain ζ_m (optional).

build_knm_paper27

build_knm_paper27(L=16, K_base=0.45, K_alpha=0.3, zeta_uniform=0.0)

Build the canonical Paper 27 Knm with exponential distance decay.

K[i,j] = K_base · exp(−K_alpha · |i − j|), diag(K) kept for intra-layer sync (unlike the inter-oscillator Knm which zeros diag).

K_base=0.45 and K_alpha=0.3 from Paper 27 §3.2, Eq. 12. Calibration anchors from Paper 27, Table 2. Cross-hierarchy boosts from Paper 27 §4.3.

UPDE Multi-Layer Solver

UPDESystem dataclass

UPDESystem(spec, dt=0.001, psi_mode='external', wrap=True)

Multi-layer UPDE driven by a KnmSpec.

step

step(theta_layers, omega_layers, *, psi_driver=None, actuation_gain=1.0, pac_gamma=0.0, K_override=None)

Advance all L layers by one Euler step.

Parameters

theta_layers : sequence of 1D arrays Phase vectors per layer. omega_layers : sequence of 1D arrays Natural frequencies per layer. psi_driver : float or None External global field phase Ψ (required if psi_mode="external"). actuation_gain : float Multiplicative gain on all coupling terms. pac_gamma : float PAC-like gating: boost inter-layer coupling by (1 + pac_gamma·(1 − R_source)). K_override : array or None Per-tick replacement for spec.K (adaptive coupling).

run

run(n_steps, theta_layers, omega_layers, *, psi_driver=None, actuation_gain=1.0, pac_gamma=0.0, K_override=None)

Run n_steps and return trajectory of per-layer R and global R.

run_lyapunov

run_lyapunov(n_steps, theta_layers, omega_layers, *, psi_driver=None, actuation_gain=1.0, pac_gamma=0.0, K_override=None)

Run n_steps with Lyapunov tracking.

Returns R histories, V histories, and per-layer + global λ. λ < 0 ⟹ stable convergence toward Ψ.

Lyapunov Guard

LyapunovGuard

LyapunovGuard(window=50, dt=0.001, lambda_threshold=0.0, max_violations=3)

Sliding-window Lyapunov stability monitor.

Parameters

window : int Number of V(t) samples in the sliding window. dt : float Timestep between samples (for λ computation). lambda_threshold : float λ above this value counts as a violation. Default 0 (any growth). max_violations : int Consecutive violations before guard refuses.

check

check(theta, psi)

Feed one sample and return stability verdict.

check_trajectory

check_trajectory(v_hist)

Batch check: compute λ from a full V(t) trajectory.

to_director_ai_dict

to_director_ai_dict(verdict)

Export verdict in DIRECTOR_AI AuditLogger format.

Realtime Monitor

RealtimeMonitor dataclass

RealtimeMonitor(upde, guard, theta_layers, omega_layers, psi_driver=0.0, pac_gamma=0.0, adaptive_engine=None)

Tick-by-tick UPDE monitor with LyapunovGuard.

from_paper27 classmethod

from_paper27(L=16, N_per=50, *, dt=0.001, zeta_uniform=0.5, psi_driver=0.0, pac_gamma=0.0, guard_window=50, guard_max_violations=3, seed=42)

Build from Paper 27 defaults.

from_plasma classmethod

from_plasma(L=8, N_per=50, *, mode='baseline', dt=0.001, zeta_uniform=0.0, psi_driver=0.0, pac_gamma=0.0, guard_window=50, guard_max_violations=3, adaptive_engine=None, seed=42)

Build from plasma-native Knm defaults with optional adaptive engine.

tick

tick(*, record=True, beta_n=0.0, q95=3.0, disruption_risk=0.0, mirnov_rms=0.0)

Advance one UPDE step and return dashboard snapshot.

reset

reset(seed=42)

Reset oscillator phases, guard state, and recorder.

save_hdf5

save_hdf5(path)

Export recorded trajectory to HDF5.

Datasets: R_global, R_layer, V_global, V_layer, lambda_exp, guard_approved, latency_us, Psi_global. Attributes: L, N_per, psi_driver, pac_gamma, n_ticks.

Requires h5py (pip install h5py).

save_npz

save_npz(path)

Export recorded trajectory to compressed NPZ (no h5py needed).

TrajectoryRecorder dataclass

TrajectoryRecorder(R_global=list(), R_layer=list(), V_global=list(), V_layer=list(), lambda_exp=list(), guard_approved=list(), latency_us=list(), Psi_global=list())

Accumulates tick snapshots for batch export.

WebSocket Stream

PhaseStreamServer dataclass

PhaseStreamServer(monitor, tick_interval_s=0.001)

Async WebSocket server wrapping a RealtimeMonitor.

serve async

serve(host='0.0.0.0', port=8765)

Start WebSocket server and tick loop.

serve_sync

serve_sync(host='0.0.0.0', port=8765)

Blocking entry point.


Control — Controllers

H-infinity (Riccati DARE)

HInfinityController

HInfinityController(A, B1, B2, C1, C2, gamma=None, D12=None, D21=None, enforce_robust_feasibility=False)

Riccati-based H-infinity controller for tokamak vertical stability.

Parameters

A : array_like, shape (n, n) Plant state matrix. B1 : array_like, shape (n, p) Disturbance input matrix. B2 : array_like, shape (n, m) Control input matrix. C1 : array_like, shape (q, n) Performance output matrix. C2 : array_like, shape (l, n) Measurement output matrix. gamma : float, optional H-infinity attenuation level. If None, found by bisection. D12 : array_like, optional Feedthrough from control to performance. Default: identity-like. D21 : array_like, optional Feedthrough from disturbance to measurement. Default: identity-like. enforce_robust_feasibility : bool, optional If True, raise ValueError unless rho(XY) < gamma^2 after synthesis.

is_stable property

is_stable

Check continuous closed-loop stability (eigenvalues of A + B2 F).

gain_margin_db property

gain_margin_db

Gain margin in dB for the continuous closed-loop system.

step

step(error, dt)

Compute control action for one timestep.

Uses discrete-time DARE-synthesised gains on the ZOH-discretised plant, guaranteeing closed-loop stability for any sampling dt.

Parameters

error : float Observed measurement (y). dt : float Timestep [s].

Returns

float Control action u.

riccati_residual_norms

riccati_residual_norms()

Return Frobenius norms of the two H-infinity Riccati residuals.

robust_feasibility_margin

robust_feasibility_margin()

Return gamma^2 - rho(XY); positive values satisfy the strict test.

reset

reset()

Reset controller state to zero.

get_radial_robust_controller

get_radial_robust_controller(gamma_growth=100.0, *, damping=10.0, enforce_robust_feasibility=False)

Return an H-infinity controller for tokamak vertical stability.

Parameters

gamma_growth : float Vertical instability growth rate [1/s]. Default 100/s (ITER-like). SPARC: ~1000/s. damping : float Passive damping coefficient [1/s]. Default 10.0. enforce_robust_feasibility : bool, optional If True, require rho(XY) < gamma^2 and raise on infeasible synthesis.

Returns

HInfinityController Riccati-synthesized robust controller.

Model Predictive Control

NeuralSurrogate

NeuralSurrogate(n_coils, n_state, verbose=True)

Linearized surrogate model around current operating point.

ModelPredictiveController

ModelPredictiveController(surrogate, target_state, *, prediction_horizon=PREDICTION_HORIZON, learning_rate=0.5, iterations=20, action_limit=2.0, action_regularization=0.1)

Gradient-based MPC planner over surrogate dynamics.

Optimal Control

OptimalController

OptimalController(config_file, *, kernel_factory=FusionKernel, verbose=True, correction_limit=5.0, coil_current_limits=(-40.0, 40.0), current_target_limits=(5.0, 16.0))

MIMO controller using response-matrix inversion with bounded actuation.

identify_system

identify_system(perturbation=0.5)

Perturb each coil and measure plasma-axis response to build Jacobian.

get_plasma_pos

get_plasma_pos()

Return current plasma-axis position [R, Z].

compute_optimal_correction

compute_optimal_correction(current_pos, target_pos, regularization_limit=0.01)

Solve Error = J * Delta_I using damped pseudoinverse.

Digital Twin

run_digital_twin

run_digital_twin(time_steps=TIME_STEPS, seed=42, save_plot=True, output_path='Tokamak_Digital_Twin.png', verbose=True, gyro_surrogate=None, chaos_monkey=False, sensor_dropout_prob=0.0, sensor_noise_std=0.0, rng=None)

Run deterministic digital-twin control simulation.

Returns a summary dict so callers can use the simulation without relying on console text or plot artifacts.

Flight Simulator

IsoFluxController

IsoFluxController(config_file, kernel_factory=FusionKernel, verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05)

Simulates the Plasma Control System (PCS). Uses PID loops to adjust Coil Currents to maintain plasma shape.

run_flight_sim

run_flight_sim(config_file=None, shot_duration=SHOT_DURATION, seed=42, save_plot=True, output_path='Tokamak_Flight_Report.png', verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05, kernel_factory=FusionKernel)

Run deterministic tokamak flight-sim control loop and return summary.

Disruption Predictor

DisruptionTransformer

DisruptionTransformer()

predict_disruption_risk

predict_disruption_risk(signal, toroidal_observables=None)

Lightweight deterministic disruption risk estimator (0..1) for control loops.

This supplements the Transformer pathway by explicitly consuming toroidal asymmetry observables from 3D diagnostics.

Disruption Contracts

run_disruption_episode

run_disruption_episode(*, rng, rl_agent, base_tbr, explorer)

predict_disruption_risk

predict_disruption_risk(signal, toroidal_observables=None)

Lightweight deterministic disruption risk estimator (0..1) for control loops.

This supplements the Transformer pathway by explicitly consuming toroidal asymmetry observables from 3D diagnostics.

SPI Mitigation

ShatteredPelletInjection

ShatteredPelletInjection(Plasma_Energy_MJ=300.0, Plasma_Current_MA=15.0)

Reduced SPI mitigation model for thermal/current quench campaigns.

run_spi_mitigation

run_spi_mitigation(*, plasma_energy_mj=300.0, plasma_current_ma=15.0, neon_quantity_mol=0.1, argon_quantity_mol=0.0, xenon_quantity_mol=0.0, duration_s=0.05, dt_s=1e-05, save_plot=True, output_path='SPI_Mitigation_Result.png', verbose=True)

Run SPI mitigation simulation and return deterministic summary metrics.

Fusion Control Room

run_control_room

run_control_room(sim_duration=SIM_DURATION, *, seed=42, save_animation=True, save_report=True, output_gif='SCPN_Fusion_Control_Room.gif', output_report='SCPN_Fusion_Status_Report.png', verbose=True, kernel_factory=None, config_file=None)

Run the control-room loop and return deterministic summary metrics.

TokamakPhysicsEngine

TokamakPhysicsEngine(size=RESOLUTION, *, seed=42, kernel=None)

Reduced Grad-Shafranov geometry model with optional kernel-Psi ingestion.

solve_flux_surfaces

solve_flux_surfaces()

Return (density, psi) from kernel state when available, otherwise from analytic Miller-parameterized geometry.

step_dynamics

step_dynamics(coil_action_top, coil_action_bottom)

Reduced vertical-displacement dynamics.

Gymnasium Environment

TokamakEnv

TokamakEnv(dt=0.001, max_steps=500, T_target=20.0, noise_std=0.01, seed=42)

Minimal Gymnasium-compatible tokamak control environment.

Follows the gymnasium.Env interface (reset/step/render) without requiring gymnasium as a hard dependency. If gymnasium is installed, this class can be registered via gymnasium.register().

Parameters

dt : float Timestep per step call [s]. max_steps : int Episode length. T_target : float Target axis temperature [keV]. noise_std : float Observation noise standard deviation.

reset

reset(seed=None)

Reset to initial plasma state.

step

step(action)

Advance one timestep.

Returns (obs, reward, terminated, truncated, info).

render

render()

Print current state.

Nengo SNN Controller

Requires pip install "scpn-control[nengo]".

from scpn_control.control import get_nengo_controller

NengoSNNController = get_nengo_controller()
ctrl = NengoSNNController()
u = ctrl.step(np.array([0.05, -0.02]))

Advanced SOC Learning

run_advanced_learning_sim

run_advanced_learning_sim(size=L, time_steps=TIME_STEPS, seed=42, *, epsilon=EPSILON, noise_probability=0.01, shear_step=0.05, shear_bounds=(0.0, 1.0), save_plot=True, output_path='Advanced_SOC_Learning.png', verbose=True)

Run deterministic SOC+Q-learning control simulation and return summary metrics.


CLI

scpn-control demo --scenario combined --steps 1000
scpn-control benchmark --n-bench 5000 --json-out
scpn-control validate --json-out
scpn-control info --json-out
scpn-control live --port 8765 --zeta 0.5 --layers 16
scpn-control hil-test --shots-dir path/to/shots
Command Description
demo Closed-loop control demonstration (PID, SNN, combined)
benchmark PID vs SNN timing benchmark with JSON output option
validate RMSE validation against reference shots
info Version, Rust backend status, weight provenance, Python/NumPy versions
live Real-time WebSocket phase sync server
hil-test Hardware-in-the-loop test campaign against shot data

Rust Acceleration

When scpn-control-rs is built via maturin, all core solvers use Rust backends automatically:

from scpn_control import RUST_BACKEND
print(RUST_BACKEND)  # True if Rust available

# Transparent acceleration — same Python API, Rust execution
kernel = FusionKernel(R0=6.2, a=2.0, B0=5.3)

Build Rust bindings:

cd scpn-control-rs/crates/control-python
maturin develop --release

PyO3 Bindings

Python Class Rust Binding Crate
FusionKernel PyFusionKernel control-core
RealtimeMonitor PyRealtimeMonitor control-math
SnnPool PySnnPool control-control
MpcController PyMpcController control-control
Plasma2D PyPlasma2D control-core
TransportSolver PyTransportSolver control-core