Control API

The scpn_fusion.control subpackage contains reactor control algorithms, digital twin infrastructure, disruption prediction, and mitigation systems.

Tokamak Flight Simulator

Tokamak flight simulator with actuator dynamics and isoflux feedback.

class scpn_fusion.control.tokamak_flight_sim.FirstOrderActuator(*, tau_s, dt_s, u_min=-1000000000.0, u_max=1000000000.0, rate_limit=1000000.0, sensor_noise_std=0.0, delay_steps=0, rng_seed=None)[source]

Bases: object

Discrete first-order actuator with rate limits, noise, and delay.

Models a realistic coil power supply for tokamak control: - First-order lag: u_applied(s) = 1/(tau*s+1) * u_cmd - Coil current rate limit: abs(du/dt) <= rate_limit [A/s] - Sensor noise: additive Gaussian on measurement - Measurement delay: pure transport delay on feedback signal

Parameters:
  • tau_s (float) – Actuator time constant [s].

  • dt_s (float) – Simulation timestep [s].

  • u_min (float) – Saturation limits.

  • u_max (float) – Saturation limits.

  • rate_limit (float) – Maximum current rate of change [A/s]. Default 1e6 (1 MA/s, ITER PF spec).

  • sensor_noise_std (float) – Standard deviation of additive sensor noise. Default 0.0 (disabled).

  • delay_steps (int) – Number of timesteps of measurement delay. Default 0.

  • rng_seed (int or None) – Random seed for reproducible noise (None = random).

__init__(*, tau_s, dt_s, u_min=-1000000000.0, u_max=1000000000.0, rate_limit=1000000.0, sensor_noise_std=0.0, delay_steps=0, rng_seed=None)[source]

Validate the actuator constants and initialise the delay buffer.

Parameters:
Return type:

None

step(command)[source]

Apply command through actuator dynamics with rate limiting.

Return type:

float

Parameters:

command (float)

get_measurement()[source]

Return delayed, noisy measurement of actuator output.

Return type:

float

class scpn_fusion.control.tokamak_flight_sim.IsoFluxController(config_file, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05)[source]

Bases: object

Simulate the Plasma Control System (PCS).

Uses PID loops to adjust Coil Currents to maintain plasma shape.

Parameters:
  • config_file (str)

  • kernel_factory (Callable[[str], Any])

  • verbose (bool)

  • actuator_tau_s (float)

  • heating_actuator_tau_s (Optional[float])

  • actuator_current_delta_limit (float)

  • heating_beta_max (float)

  • control_dt_s (float)

__init__(config_file, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05)[source]

Build the kernel, control gains, actuators, and telemetry history.

Parameters:
Return type:

None

pid_step(pid, error)[source]

Update one PID state dictionary and return its control command.

Return type:

float

Parameters:
run_shot(shot_duration=30, save_plot=True, output_path='Tokamak_Flight_Report.png')[source]

Run a simulated tokamak shot.

Parameters:
  • shot_duration (int) – Number of simulation steps. Default 30.

  • save_plot (bool) – Whether to generate a summary plot.

  • output_path (str) – Filename for the plot.

Return type:

Dict[str, Any]

visualize_flight(output_path='Tokamak_Flight_Report.png')[source]

Render the flight trajectory report plot when plotting is available.

Return type:

Tuple[bool, Optional[str]]

Parameters:

output_path (str)

scpn_fusion.control.tokamak_flight_sim.run_flight_sim(config_file=None, shot_duration=50, seed=42, save_plot=True, output_path='Tokamak_Flight_Report.png', verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>)[source]

Run deterministic tokamak flight-sim control loop and return summary.

Return type:

Dict[str, Any]

Parameters:

Tokamak Digital Twin

Two-dimensional tokamak digital-twin runtime and neural policy model.

class scpn_fusion.control.tokamak_digital_twin.TokamakTopoloy(size=40)[source]

Bases: object

Magnetic geometry: q-profile and island evolution via Modified Rutherford Equation.

Parameters:

size (int)

__init__(size=40)[source]

Build the normalised radius map, mask, and seed the q-profile.

Parameters:

size (int)

Return type:

None

step_island_evolution(dt=0.1)[source]

Evolve island widths via MRE with neoclassical bootstrap drive.

Return type:

None

Parameters:

dt (float)

update_q_profile(current_drive_action)[source]

Update parabolic q(r) = q0 + (qa-q0)*r^2 with current drive modulation.

Return type:

None

Parameters:

current_drive_action (float)

get_rational_surfaces()[source]

Boolean map of rational-surface islands from current MRE widths.

Return type:

ndarray[Any, dtype[bool_]]

class scpn_fusion.control.tokamak_digital_twin.Plasma2D(topology, gyro_surrogate=None)[source]

Bases: object

2D diffusion-reaction model on a poloidal cross-section.

Parameters:
__init__(topology, gyro_surrogate=None)[source]

Store the magnetic topology, temperature field, and optional surrogate.

Parameters:
Return type:

None

step(action)[source]

Evolve plasma one timestep with current drive action in [-1, 1].

Return type:

tuple[ndarray[Any, dtype[float64]], float]

Parameters:

action (float)

class scpn_fusion.control.tokamak_digital_twin.SimpleNeuralNet(input_size, hidden_size, output_size, *, rng)[source]

Bases: object

NumPy MLP policy network for continuous control.

Parameters:
__init__(input_size, hidden_size, output_size, *, rng)[source]

Initialise the two-layer weight matrices with scaled normal samples.

Parameters:
Return type:

None

forward(x)[source]

Evaluate the policy network for a batch of state vectors.

Return type:

ndarray[Any, dtype[float64]]

Parameters:

x (ndarray[Any, dtype[float64]])

train_step(x, target_action, advantage)[source]

REINFORCE-like policy gradient step.

Return type:

float

Parameters:
scpn_fusion.control.tokamak_digital_twin.run_digital_twin(time_steps=10000, seed=42, save_plot=True, output_path='Tokamak_Digital_Twin.png', verbose=True, gyro_surrogate=None, chaos_monkey=False, sensor_dropout_prob=0.0, sensor_noise_std=0.0, rng=None)[source]

Run digital-twin control simulation, return summary dict.

Return type:

dict[str, Any]

Parameters:
scpn_fusion.control.tokamak_digital_twin.run_digital_twin_ids(*, machine='ITER', shot=0, run=0, **kwargs)[source]

Run digital twin and return IDS-like equilibrium payload.

Return type:

dict[str, Any]

Parameters:
scpn_fusion.control.tokamak_digital_twin.run_digital_twin_ids_history(history_steps, *, machine='ITER', shot=0, run=0, seed=42, **kwargs)[source]

Run digital twin at multiple horizons and return IDS-like payload sequence.

Return type:

list[dict[str, Any]]

Parameters:
scpn_fusion.control.tokamak_digital_twin.run_digital_twin_ids_pulse(history_steps, *, machine='ITER', shot=0, run=0, seed=42, **kwargs)[source]

Run digital twin at multiple horizons and return pulse-style IDS container.

Return type:

dict[str, Any]

Parameters:

Digital Twin Ingest

Realtime digital-twin ingestion hook with SNN scenario planning.

class scpn_fusion.control.digital_twin_ingest.TelemetryPacket(t_ms, machine, ip_ma, beta_n, q95, density_1e19)[source]

Bases: object

Single timestamped machine telemetry sample used by the digital-twin hook.

Parameters:
t_ms: int
machine: str
ip_ma: float
beta_n: float
q95: float
density_1e19: float
scpn_fusion.control.digital_twin_ingest.generate_emulated_stream(machine, *, samples=320, dt_ms=5, seed=42)[source]

Generate deterministic machine telemetry packets for runtime replay tests.

Return type:

list[TelemetryPacket]

Parameters:
class scpn_fusion.control.digital_twin_ingest.RealtimeTwinHook(machine, *, max_buffer=512, seed=42)[source]

Bases: object

In-memory realtime ingest + SNN planning hook.

Parameters:
ingest(packet)[source]

Append a telemetry packet while retaining only the configured ring buffer.

Return type:

None

Parameters:

packet (TelemetryPacket)

scenario_plan(*, horizon=24)[source]

Project near-term risk and return an SNN-derived mitigation plan summary.

Return type:

dict[str, float | bool]

Parameters:

horizon (int)

scpn_fusion.control.digital_twin_ingest.run_realtime_twin_session(machine, *, seed=42, samples=320, dt_ms=5, horizon=24, plan_every=8, max_buffer=512, chaos_dropout_prob=0.0, chaos_noise_std=0.0)[source]

Run deterministic digital-twin ingest+planning session and return summary.

Return type:

dict[str, Any]

Parameters:

Traceable Runtime (JAX/TorchScript)

Optional JAX-traceable control-loop utilities with NumPy fallback.

class scpn_fusion.control.jax_traceable_runtime.TraceableRuntimeSpec(dt_s=0.001, tau_s=0.005, gain=1.0, command_limit=1.0)[source]

Bases: object

Configuration for reduced traceable first-order actuator dynamics.

Parameters:
dt_s: float = 0.001
tau_s: float = 0.005
gain: float = 1.0
command_limit: float = 1.0
class scpn_fusion.control.jax_traceable_runtime.TraceableRuntimeResult(state_history, backend_used, compiled)[source]

Bases: object

Result of a traceable control-loop rollout.

Parameters:
state_history: ndarray[Any, dtype[float64]]
backend_used: str
compiled: bool
class scpn_fusion.control.jax_traceable_runtime.TraceableRuntimeBatchResult(state_history, backend_used, compiled)[source]

Bases: object

Result of batched traceable control-loop rollout.

Parameters:
state_history: ndarray[Any, dtype[float64]]
backend_used: str
compiled: bool
class scpn_fusion.control.jax_traceable_runtime.TraceableBackendParityReport(backend, single_max_abs_err, batch_max_abs_err, single_within_tol, batch_within_tol)[source]

Bases: object

Parity metrics against NumPy reference backend.

Parameters:
  • backend (str)

  • single_max_abs_err (float)

  • batch_max_abs_err (float)

  • single_within_tol (bool)

  • batch_within_tol (bool)

backend: str
single_max_abs_err: float
batch_max_abs_err: float
single_within_tol: bool
batch_within_tol: bool
scpn_fusion.control.jax_traceable_runtime.available_traceable_backends()[source]

Return available runtime backends on this machine.

Return type:

list[str]

scpn_fusion.control.jax_traceable_runtime.run_traceable_control_loop(commands, *, initial_state=0.0, spec=None, backend='auto')[source]

Run a reduced control loop suitable for optional JAX tracing/JIT.

backend can be auto, numpy, jax, or torchscript.

Return type:

TraceableRuntimeResult

Parameters:
scpn_fusion.control.jax_traceable_runtime.run_traceable_control_batch(commands, *, initial_state=None, spec=None, backend='auto')[source]

Run batched reduced control loops with optional JAX/TorchScript backends.

commands shape: (batch, steps)

Return type:

TraceableRuntimeBatchResult

Parameters:
scpn_fusion.control.jax_traceable_runtime.validate_traceable_backend_parity(*, steps=64, batch=8, seed=42, spec=None, atol=1e-08, backends=None)[source]

Compare available compiled backends to NumPy for single and batch rollouts.

Return type:

dict[str, TraceableBackendParityReport]

Parameters:

Model-Predictive Control (Optimal)

Bounded response-matrix optimal control for tokamak axis regulation.

class scpn_fusion.control.fusion_optimal_control.OptimalController(config_file, *, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True, correction_limit=5.0, coil_current_limits=(-40.0, 40.0), current_target_limits=(5.0, 16.0))[source]

Bases: object

MIMO controller using response-matrix inversion with bounded actuation.

Parameters:
  • config_file (str)

  • kernel_factory (Callable[[str], Any])

  • verbose (bool)

  • correction_limit (float)

  • coil_current_limits (Tuple[float, float])

  • current_target_limits (Tuple[float, float])

__init__(config_file, *, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True, correction_limit=5.0, coil_current_limits=(-40.0, 40.0), current_target_limits=(5.0, 16.0))[source]

Build the kernel, response matrix, actuation bounds, and telemetry buffers.

Parameters:
Return type:

None

identify_system(perturbation=0.5)[source]

Perturb each coil and measure plasma-axis response to build the Jacobian.

Return type:

None

Parameters:

perturbation (float)

get_shafranov_shift()[source]

Calculate the Shafranov shift Delta R.

Delta R ~ (a^2 / 2R) * (beta_p + li/2)

Return type:

float

get_plasma_pos()[source]

Return current magnetic-axis position [R, Z].

Applies a Shafranov-shift correction for high-beta states.

Return type:

ndarray[Any, dtype[float64]]

compute_optimal_correction(current_pos, target_pos, regularization_lambda=0.05, *, regularization_limit=None)[source]

Solve Error = J * Delta_I using Tikhonov-regularised (damped) SVD.

Provides smoother control than hard-cutoff SVD near singularities.

Return type:

ndarray[Any, dtype[float64]]

Parameters:
run_optimal_shot(shot_steps=50, target_r=6.0, target_z=0.0, gain=0.8, ip_start_ma=10.0, ip_span_ma=5.0, identify_first=False, save_plot=True, output_path='Optimal_Control_Result.png')[source]

Run one bounded optimal-control shot and return telemetry summary metrics.

Return type:

Dict[str, Any]

Parameters:
plot_telemetry(output_path='Optimal_Control_Result.png')[source]

Render axis-tracking and final-plasma telemetry to a plot file.

Return type:

Tuple[bool, Optional[str]]

Parameters:

output_path (str)

scpn_fusion.control.fusion_optimal_control.run_optimal_control(config_file=None, shot_steps=50, target_r=6.0, target_z=0.0, seed=42, save_plot=True, output_path='Optimal_Control_Result.png', verbose=True, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, coil_current_limits=(-40.0, 40.0), current_target_limits=(5.0, 16.0))[source]

Run bounded optimal-control shot and return deterministic summary.

Return type:

Dict[str, Any]

Parameters:

State-of-the-Art MPC

Surrogate-assisted model predictive control for tokamak trajectory tracking.

class scpn_fusion.control.fusion_sota_mpc.NeuralSurrogate(n_coils, n_state, verbose=True)[source]

Bases: object

Linearised surrogate model around the current operating point.

Parameters:
__init__(n_coils, n_state, verbose=True)[source]

Initialise the identity state matrix and zero coil-response matrix.

Parameters:
Return type:

None

train_on_kernel(kernel, perturbation=1.0)[source]

Estimate the linear coil-response matrix by perturbing the supplied kernel.

Return type:

None

Parameters:
get_state(kernel)[source]

Extract magnetic-axis and X-point state coordinates from a kernel snapshot.

Return type:

ndarray[Any, dtype[float64]]

Parameters:

kernel (Any)

predict(current_state, action_delta)[source]

Predict the next surrogate state after one coil-current delta vector.

Return type:

ndarray[Any, dtype[float64]]

Parameters:
class scpn_fusion.control.fusion_sota_mpc.ModelPredictiveController(surrogate, target_state, *, prediction_horizon=10, learning_rate=0.5, iterations=20, action_limit=2.0, action_regularization=0.1)[source]

Bases: object

Gradient-based MPC planner over surrogate dynamics.

Parameters:
__init__(surrogate, target_state, *, prediction_horizon=10, learning_rate=0.5, iterations=20, action_limit=2.0, action_regularization=0.1)[source]

Validate horizon/rate/limit knobs and store the target state.

Parameters:
Return type:

None

plan_trajectory(current_state)[source]

Optimize the finite-horizon action sequence and return the first action.

Return type:

ndarray[Any, dtype[float64]]

Parameters:

current_state (ndarray[Any, dtype[float64]])

scpn_fusion.control.fusion_sota_mpc.run_sota_simulation(config_file=None, shot_length=100, prediction_horizon=10, target_vector=None, disturbance_start_step=20, disturbance_per_step_ma=0.1, current_target_bounds=(5.0, 16.0), action_limit=2.0, coil_current_limits=(-40.0, 40.0), save_plot=True, output_path='SOTA_MPC_Results.png', verbose=True, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>)[source]

Run the surrogate MPC simulation and return bounded telemetry metrics.

Return type:

Dict[str, Any]

Parameters:

Disruption Predictor

Transformer and deterministic-runtime interfaces for disruption-risk prediction.

class scpn_fusion.control.disruption_predictor.HybridAnomalyDetector(threshold=0.5, ema=0.05)[source]

Bases: object

Hybrid detector combining learned risk and deterministic smoothing.

Parameters:
__init__(threshold=0.5, ema=0.05)[source]

Validate the alarm threshold and EMA rate and reset the running statistics.

Parameters:
Return type:

None

score(signal, toroidal_observables=None)[source]

Return fused supervised and anomaly-smoothed disruption-risk scores.

Return type:

dict[str, float | bool]

Parameters:
scpn_fusion.control.disruption_predictor.apply_bit_flip_fault(value, bit_index)[source]

Inject a deterministic single-bit fault into a float.

Return type:

float

Parameters:
scpn_fusion.control.disruption_predictor.apply_disruption_logit_bias(risk, bias_delta)[source]

Apply additive logit-space calibration bias to a bounded risk score.

Return type:

float

Parameters:
scpn_fusion.control.disruption_predictor.build_disruption_feature_vector(signal, toroidal_observables=None)[source]

Build a compact feature vector for control-oriented disruption scoring.

Return type:

ndarray[Any, dtype[float64]]

Parameters:
Feature layout:
[mean, std, max, slope, energy, last,

toroidal_n1_amp, toroidal_n2_amp, toroidal_n3_amp, toroidal_asymmetry_index, toroidal_radial_spread]

scpn_fusion.control.disruption_predictor.predict_disruption_risk(signal, toroidal_observables=None, bias_delta=0.0)[source]

Estimate a deterministic disruption risk (0..1) for control loops.

This supplements the Transformer pathway by explicitly consuming toroidal asymmetry observables from 3D diagnostics.

Return type:

float

Parameters:
scpn_fusion.control.disruption_predictor.run_anomaly_alarm_campaign(*, seed=0, episodes=128, window=64, threshold=0.5)[source]

Evaluate alarm true/false-positive behavior on deterministic synthetic episodes.

Return type:

dict[str, float | int | bool]

Parameters:
scpn_fusion.control.disruption_predictor.run_fault_noise_campaign(*, seed=0, episodes=64, window=64, noise_std=0.02, bit_flip_interval=7, recovery_window=4, recovery_epsilon=0.05)[source]

Run deterministic robustness campaign with injected noise and bit flips.

Return type:

dict[str, Any]

Parameters:
  • seed (int)

  • episodes (int)

  • window (int)

  • noise_std (float)

  • bit_flip_interval (int)

  • recovery_window (int)

  • recovery_epsilon (float)

class scpn_fusion.control.disruption_predictor.DisruptionTransformer(seq_len=100)[source]

Bases: Module

Transformer encoder that scores tearing-mode disruption risk from a signal window.

Parameters:

seq_len (int)

forward(src)[source]

Run the encoder and return the sigmoid disruption probability per batch row.

Return type:

Any

Parameters:

src (torch.Tensor)

__init__(seq_len=100)[source]

Build the embedding, positional encoder, transformer stack, and classifier head.

Parameters:

seq_len (int)

Return type:

None

scpn_fusion.control.disruption_predictor.train_predictor(seq_len=100, n_shots=500, epochs=50, model_path=None, seed=42, save_plot=True)[source]

Train and persist a disruption transformer on synthetic tearing-mode shots.

Return type:

tuple[Any, dict[str, Any]]

Parameters:
scpn_fusion.control.disruption_predictor.load_or_train_predictor(model_path=None, seq_len=100, force_retrain=False, train_kwargs=None, train_if_missing=True, allow_fallback=True)[source]

Load a validated checkpoint or train/recover according to fallback policy.

Return type:

tuple[Any, dict[str, Any]]

Parameters:
scpn_fusion.control.disruption_predictor.predict_disruption_risk_safe(signal, toroidal_observables=None, *, model_path=None, seq_len=100, train_if_missing=False, allow_fallback=True)[source]

Predict disruption risk via the checkpoint path, else a compatibility estimator.

Return type:

tuple[float, dict[str, Any]]

Returns:

  • risk, metadatarisk is always a bounded float in [0, 1]. metadata includes whether compatibility mode was used.

  • allow_fallback – If False, this API raises on missing/broken checkpoints or inference failures instead of returning compatibility risk from predict_disruption_risk.

Parameters:
  • signal (Any)

  • toroidal_observables (Any)

  • model_path (str | Path | None)

  • seq_len (int)

  • train_if_missing (bool)

  • allow_fallback (bool)

scpn_fusion.control.disruption_predictor.evaluate_predictor(model, X_test, y_test, times_test=None, threshold=0.5)[source]

Evaluate disruption predictor on test set.

Returns dict with accuracy, precision, recall, F1, confusion matrix, and recall@T for T in [10, 20, 30, 50, 100] ms.

Return type:

dict[str, Any]

Parameters:

Shattered Pellet Injection

Shattered-pellet-injection mitigation models and runtime summaries.

class scpn_fusion.control.spi_mitigation.ShatteredPelletInjection(Plasma_Energy_MJ=300.0, Plasma_Current_MA=15.0)[source]

Bases: object

Reduced SPI mitigation model for thermal/current quench campaigns.

Parameters:
  • Plasma_Energy_MJ (float)

  • Plasma_Current_MA (float)

static estimate_z_eff(neon_quantity_mol)[source]

Estimate effective charge for a neon-only SPI payload.

Return type:

float

Parameters:

neon_quantity_mol (float)

static estimate_z_eff_cocktail(*, neon_quantity_mol=0.0, argon_quantity_mol=0.0, xenon_quantity_mol=0.0)[source]

Estimate effective charge for a mixed neon/argon/xenon payload.

Return type:

float

Parameters:
  • neon_quantity_mol (float)

  • argon_quantity_mol (float)

  • xenon_quantity_mol (float)

static estimate_mitigation_cocktail(*, risk_score, disturbance, action_bias=0.0)[source]

Choose a deterministic SPI gas cocktail from risk and disturbance inputs.

Return type:

dict[str, float]

Parameters:
static estimate_tau_cq(te_keV, z_eff)[source]

Estimate current-quench time from electron temperature and effective charge.

Return type:

float

Parameters:
trigger_mitigation(neon_quantity_mol=0.1, argon_quantity_mol=0.0, xenon_quantity_mol=0.0, return_diagnostics=False, *, duration_s=0.05, dt_s=1e-05, verbose=True)[source]

Run the thermal/current quench mitigation time history.

Parameters:
scpn_fusion.control.spi_mitigation.run_spi_mitigation(*, plasma_energy_mj=300.0, plasma_current_ma=15.0, neon_quantity_mol=0.1, argon_quantity_mol=0.0, xenon_quantity_mol=0.0, duration_s=0.05, dt_s=1e-05, save_plot=True, output_path='SPI_Mitigation_Result.png', verbose=True)[source]

Run SPI mitigation simulation and return deterministic summary metrics.

Return type:

dict[str, Any]

Parameters:
scpn_fusion.control.spi_mitigation.run_spi_test()[source]

Run the default SPI mitigation scenario with plotting enabled.

Return type:

dict[str, Any]

Integrated Control Room

Deterministic control-room simulation with diagnostics, estimation, and rendering.

class scpn_fusion.control.fusion_control_room.TokamakPhysicsEngine(size=60, *, seed=42, kernel=None)[source]

Bases: object

Reduced Grad-Shafranov geometry model with optional kernel-Psi ingestion.

Parameters:
  • size (int)

  • seed (int)

  • kernel (Optional[Any])

__init__(size=60, *, seed=42, kernel=None)[source]

Initialise the geometry grid, RNG, and optional kernel handle.

Parameters:
Return type:

None

solve_flux_surfaces()[source]

Return (density, psi) from kernel state or analytic geometry.

Uses the kernel Psi field when available, otherwise an analytic Miller-parameterized geometry.

Return type:

tuple[ndarray[Any, dtype[float64]], ndarray[Any, dtype[float64]]]

step_dynamics(coil_action_top, coil_action_bottom)[source]

Advance the reduced vertical-displacement dynamics by one step.

Return type:

float

Parameters:
  • coil_action_top (float)

  • coil_action_bottom (float)

class scpn_fusion.control.fusion_control_room.DiagnosticSystem(rng)[source]

Bases: object

Noisy vertical-position probe.

Parameters:

rng (np.random.Generator)

__init__(rng)[source]

Store the random generator used to inject measurement noise.

Parameters:

rng (Generator)

Return type:

None

measure_position(true_z)[source]

Return a noisy vertical-position measurement from the supplied true state.

Return type:

float

Parameters:

true_z (float)

class scpn_fusion.control.fusion_control_room.KalmanObserver(dt=0.1)[source]

Bases: object

Linear Kalman filter for plasma vertical-position estimation.

Hardens the state estimate against sensor noise and measurement dropout.

Parameters:

dt (float)

__init__(dt=0.1)[source]

Initialise the state vector, covariances, and linear drift model.

Parameters:

dt (float)

update(measured_z, dropout=False)[source]

Predict-correct cycle. Returns filtered Z-position.

Return type:

float

Parameters:
class scpn_fusion.control.fusion_control_room.NeuralController(dt=0.1)[source]

Bases: object

Hardened PID control policy for vertical stabilization.

Parameters:

dt (float)

__init__(dt=0.1)[source]

Initialise PID gains, derivative filter, and anti-windup limit.

Parameters:

dt (float)

Return type:

None

compute_action(measured_z)[source]

Map filtered vertical displacement to bounded top and bottom coil actions.

Return type:

tuple[float, float]

Parameters:

measured_z (float)

scpn_fusion.control.fusion_control_room.run_control_room(sim_duration=200, *, seed=42, save_animation=True, save_report=True, output_gif='SCPN_Fusion_Control_Room.gif', output_report='SCPN_Fusion_Status_Report.png', verbose=True, kernel_factory=None, config_file=None, allow_kernel_fallback=True)[source]

Run the control-room loop and return deterministic summary metrics.

Return type:

dict[str, Any]

Parameters:

Neuro-Cybernetic Controller

Push-pull spiking controller for kernel-backed plasma axis control.

class scpn_fusion.control.neuro_cybernetic_controller.SpikingControllerPool(n_neurons=20, gain=1.0, tau_window=10, use_quantum=False, *, seed=42, allow_numpy_fallback=True, dt_s=0.001, tau_mem_s=0.015, noise_std=0.02)[source]

Bases: object

Push-pull spiking control population with deterministic compatibility path.

Preferred backend is sc-neurocore. If unavailable, a reduced NumPy LIF population is used so controller workflows remain executable in CI.

Parameters:
__init__(n_neurons=20, gain=1.0, tau_window=10, use_quantum=False, *, seed=42, allow_numpy_fallback=True, dt_s=0.001, tau_mem_s=0.015, noise_std=0.02)[source]

Configure population size, gains, and the spiking/NumPy backend.

Parameters:
Return type:

None

step(error_signal)[source]

Advance the push-pull populations and return one signed control command.

Return type:

float

Parameters:

error_signal (float)

class scpn_fusion.control.neuro_cybernetic_controller.NeuroCyberneticController(config_file, seed=42, *, shot_duration=100, kernel_factory=None)[source]

Bases: object

Replace PID loops with push-pull spiking populations.

Parameters:
  • config_file (str)

  • seed (int)

  • shot_duration (int)

  • kernel_factory (Optional[Callable[[str], Any]])

__init__(config_file, seed=42, *, shot_duration=100, kernel_factory=None)[source]

Load the kernel configuration and build the spiking controller pools.

Parameters:
Return type:

None

initialize_brains(use_quantum=False)[source]

Initialise radial and vertical spiking controller populations.

Return type:

None

Parameters:

use_quantum (bool)

run_shot(*, save_plot=True, verbose=True, output_path=None)[source]

Run a classical spiking-control shot and return telemetry metrics.

Return type:

Dict[str, Any]

Parameters:
  • save_plot (bool)

  • verbose (bool)

  • output_path (str | None)

run_quantum_shot(*, save_plot=True, verbose=True, output_path=None)[source]

Run a quantum-entropy-enabled spiking-control shot when available.

Return type:

Dict[str, Any]

Parameters:
  • save_plot (bool)

  • verbose (bool)

  • output_path (str | None)

visualize(title, *, output_path=None, verbose=True)[source]

Write control history plots and return the output image path.

Return type:

str

Parameters:
  • title (str)

  • output_path (str | None)

  • verbose (bool)

scpn_fusion.control.neuro_cybernetic_controller.run_neuro_cybernetic_control(*, config_file, shot_duration=100, seed=42, quantum=False, save_plot=False, verbose=False, output_path=None, kernel_factory=None)[source]

Run neuro-cybernetic control in deterministic non-interactive mode.

Return type:

Dict[str, Any]

Parameters:

SOC Fusion Learning

Self-organised-criticality learning runtime for turbulence and shear control.

class scpn_fusion.control.advanced_soc_fusion_learning.CoupledSandpileReactor(size=60, *, z_crit_base=6.0, flow_generation=0.2, flow_damping=0.05, shear_efficiency=3.0, max_sub_steps=50, flow_bounds=(0.0, 5.0), energy_per_topple_mj=0.05)[source]

Bases: object

Predator-prey sandpile approximation for turbulence/flow coupling.

Parameters:
__init__(size=60, *, z_crit_base=6.0, flow_generation=0.2, flow_damping=0.05, shear_efficiency=3.0, max_sub_steps=50, flow_bounds=(0.0, 5.0), energy_per_topple_mj=0.05)[source]

Validate the sandpile parameters and allocate the height/flow state.

Parameters:
Return type:

None

drive(amount=1.0)[source]

Inject edge drive into the sandpile state with non-negative magnitude.

Return type:

None

Parameters:

amount (float)

step_physics(external_shear)[source]

Advance one avalanche-relaxation step and return turbulence and flow diagnostics.

Return type:

tuple[int, float, float]

Parameters:

external_shear (float)

get_profile_energy()[source]

Return the core-side cumulative profile energy proxy.

Return type:

float

get_elm_energy_mj(topple_count)[source]

Calculate real energy release from topple count.

Return type:

float

Parameters:

topple_count (int)

class scpn_fusion.control.advanced_soc_fusion_learning.FusionAIAgent(*, alpha=0.1, gamma=0.95, epsilon=0.1, n_states_turb=5, n_states_flow=5, n_actions=3, entropy_beta=0.05)[source]

Bases: object

Tabular Q-learning controller on discretized turbulence/flow states.

Parameters:
__init__(*, alpha=0.1, gamma=0.95, epsilon=0.1, n_states_turb=5, n_states_flow=5, n_actions=3, entropy_beta=0.05)[source]

Validate the learning rates and allocate the Q-table.

Parameters:
Return type:

None

discretize_state(turb, flow)[source]

Map continuous turbulence and flow diagnostics to Q-table state indices.

Return type:

tuple[int, int]

Parameters:
choose_action(state, rng)[source]

Choose an epsilon-greedy shear-control action for the current discrete state.

Return type:

int

Parameters:
learn(state, action, new_state, reward)[source]

Apply a soft-Q learning update with entropy regularisation.

Q(s,a) = R + gamma * [ max Q(s’,a’) + beta * Entropy ]

Return type:

float

Parameters:
class scpn_fusion.control.advanced_soc_fusion_learning.FusionAI_Agent(*, alpha=0.1, gamma=0.95, epsilon=0.1, n_states_turb=5, n_states_flow=5, n_actions=3, entropy_beta=0.05)[source]

Bases: FusionAIAgent

Backward-compatible alias for older scripts.

Parameters:
scpn_fusion.control.advanced_soc_fusion_learning.run_advanced_learning_sim(size=60, time_steps=10000, seed=42, *, epsilon=0.1, noise_probability=0.01, shear_step=0.05, shear_bounds=(0.0, 1.0), save_plot=True, output_path='Advanced_SOC_Learning.png', verbose=True)[source]

Run deterministic SOC+Q-learning control simulation and return summary metrics.

Return type:

Dict[str, Any]

Parameters:

Analytic Solver

Analytic vertical-field equilibrium helper for coil-current initialisation.

class scpn_fusion.control.analytic_solver.AnalyticEquilibriumSolver(config_path, *, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True)[source]

Bases: object

Analytic vertical-field target and least-norm coil-current solve.

Parameters:
  • config_path (str)

  • kernel_factory (Callable[[str], Any])

  • verbose (bool)

__init__(config_path, *, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True)[source]

Instantiate the kernel from the config and record verbosity.

Parameters:
Return type:

None

calculate_required_Bv(R_geo, a_min, Ip_MA, *, beta_p=0.5, li=0.8)[source]

Estimate the vertical field from Shafranov radial-force balance.

Return type:

float

Parameters:
compute_coil_efficiencies(target_R, *, target_Z=0.0)[source]

Compute dBz/dI per coil at target location using kernel vacuum-field map.

Return type:

ndarray[Any, dtype[float64]]

Parameters:
solve_coil_currents(target_Bv, target_R, *, target_Z=0.0, ridge_lambda=0.0)[source]

Solve least-norm coil currents for desired vertical field target.

Return type:

ndarray[Any, dtype[float64]]

Parameters:
apply_currents(currents)[source]

Write a coil-current vector into the solver kernel configuration.

Return type:

None

Parameters:

currents (ndarray[Any, dtype[float64]])

apply_and_save(currents, output_path=None)[source]

Apply coil currents and persist the resulting kernel configuration.

Return type:

str

Parameters:
scpn_fusion.control.analytic_solver.run_analytic_solver(config_path=None, *, target_r=6.2, target_z=0.0, a_minor=2.0, ip_target_ma=15.0, beta_p=0.5, li=0.8, ridge_lambda=0.0, save_config=True, output_config_path=None, allow_validation_fallback=True, verbose=True, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>)[source]

Run analytic equilibrium solve and return deterministic summary.

Return type:

Dict[str, Any]

Parameters:

Director Interface

Director-layer supervisory interface for neuro-cybernetic fusion control.

class scpn_fusion.control.director_interface.DirectorInterface(config_path, *, allow_fallback=True, director=None, controller_factory=<class 'scpn_fusion.control.neuro_cybernetic_controller.NeuroCyberneticController'>, entropy_threshold=0.3, history_window=10)[source]

Bases: object

Interfaces the ‘Director’ (Layer 16: Coherence Oversight) with the Fusion Reactor.

Role: The Director does NOT control the coils (Layer 2 does that). The Director controls the Controller. It sets the strategy and monitors for “Backfire”.

Mechanism: 1. Sample System State (Physics + Neural Activity). 2. Format as a “Prompt” for the Director. 3. Director calculates Entropy/Risk. 4. If Safe: Director updates Target Parameters. 5. If Unsafe: Director triggers corrective action.

Parameters:
  • config_path (str)

  • allow_fallback (bool)

  • director (Optional[Any])

  • controller_factory (Callable[[str], Any])

  • entropy_threshold (float)

  • history_window (int)

format_state_for_director(t, ip, err_r, err_z, brain_activity)[source]

Translate physical telemetry into a semantic prompt for the Director.

Return type:

str

Parameters:
run_directed_mission(duration=100, *, use_quantum=True, glitch_start_step=50, glitch_std=500.0, rng_seed=42, save_plot=True, output_path='Director_Interface_Result.png', verbose=True)[source]

Run a supervised neuro-cybernetic control mission and return summary metrics.

Return type:

dict[str, Any]

Parameters:
visualize(output_path='Director_Interface_Result.png')[source]

Render mission current-target and radial-error histories to a plot file.

Return type:

str

Parameters:

output_path (str)

Fueling Mode Controller

Ice-pellet fueling mode via Petri-to-SNN control path.

class scpn_fusion.control.fueling_mode.FuelingSimResult(final_density, final_abs_error, rmse, steps, dt_s, history_density, history_command)[source]

Bases: object

Deterministic fueling simulation result and full command/density traces.

Parameters:
final_density: float
final_abs_error: float
rmse: float
steps: int
dt_s: float
history_density: list[float]
history_command: list[float]
class scpn_fusion.control.fueling_mode.IcePelletFuelingController(target_density=1.0)[source]

Bases: object

Hybrid Petri-to-SNN + PI fueling controller.

Parameters:

target_density (float)

step(density, k, dt_s)[source]

Return one bounded fueling command and current density error.

Return type:

tuple[float, float]

Parameters:
scpn_fusion.control.fueling_mode.simulate_iter_density_control(*, target_density=1.0, initial_density=0.82, steps=3000, dt_s=0.001)[source]

Run deterministic reduced ITER-like density control and return full traces.

Return type:

FuelingSimResult

Parameters:
scpn_fusion.control.fueling_mode.run_fueling_mode(*, target_density=1.0, initial_density=0.82, steps=3000, dt_s=0.001)[source]

Run deterministic fuelling simulation and return summary metrics.

Return type:

dict[str, Any]

Parameters:

TORAX Hybrid Loop

Synthetic TORAX-hybrid realtime control lane for NSTX-U-like scenarios.

class scpn_fusion.control.torax_hybrid_loop.ToraxPlasmaState(beta_n, q95, li3, w_thermal_mj)[source]

Bases: object

Reduced plasma state used by the synthetic TORAX-hybrid campaign.

Parameters:
beta_n: float
q95: float
li3: float
w_thermal_mj: float
class scpn_fusion.control.torax_hybrid_loop.ToraxHybridCampaignResult(episodes, steps_per_episode, disruption_avoidance_rate, torax_parity_pct, p95_loop_latency_ms, mean_risk, passes_thresholds)[source]

Bases: object

Threshold summary for the NSTX-U-like TORAX-hybrid replay campaign.

Parameters:
  • episodes (int)

  • steps_per_episode (int)

  • disruption_avoidance_rate (float)

  • torax_parity_pct (float)

  • p95_loop_latency_ms (float)

  • mean_risk (float)

  • passes_thresholds (bool)

episodes: int
steps_per_episode: int
disruption_avoidance_rate: float
torax_parity_pct: float
p95_loop_latency_ms: float
mean_risk: float
passes_thresholds: bool
scpn_fusion.control.torax_hybrid_loop.run_nstxu_torax_hybrid_campaign(*, seed=42, episodes=16, steps_per_episode=220)[source]

Run deterministic NSTX-U-like realtime hybrid control campaign.

Return type:

ToraxHybridCampaignResult

Parameters:
  • seed (int)

  • episodes (int)

  • steps_per_episode (int)

Real-Time EFIT

Realtime EFIT-like magnetic reconstruction and synthetic diagnostic response.

class scpn_fusion.control.realtime_efit.MagneticDiagnostics(flux_loops, b_probes, rogowski_radius)[source]

Bases: object

Layout of magnetic sensors.

Parameters:
flux_loops: list[tuple[float, float]]
b_probes: list[tuple[float, float, str]]
rogowski_radius: float
class scpn_fusion.control.realtime_efit.ShapeParams(R0, a, kappa, delta_upper, delta_lower, q95, beta_pol, li, Ip_reconstructed)[source]

Bases: object

Reconstructed macroscopic parameters.

Parameters:
R0: float
a: float
kappa: float
delta_upper: float
delta_lower: float
q95: float
beta_pol: float
li: float
Ip_reconstructed: float
class scpn_fusion.control.realtime_efit.ReconstructionResult(psi, p_prime_coeffs, ff_prime_coeffs, shape, chi_squared, n_iterations, wall_time_ms)[source]

Bases: object

EFIT reconstruction output: psi field, source coefficients, and shape.

Parameters:
psi: ndarray[Any, dtype[float64]]
p_prime_coeffs: ndarray[Any, dtype[float64]]
ff_prime_coeffs: ndarray[Any, dtype[float64]]
shape: ShapeParams
chi_squared: float
n_iterations: int
wall_time_ms: float
class scpn_fusion.control.realtime_efit.DiagnosticResponse(diagnostics, R_grid, Z_grid)[source]

Bases: object

Forward model: psi field → synthetic flux-loop and B-probe signals.

Parameters:
__init__(diagnostics, R_grid, Z_grid)[source]

Store the sensor layout and the R/Z reconstruction grids.

Parameters:
Return type:

None

simulate_measurements(psi, coil_currents)[source]

Generate synthetic measurements from a given psi field.

Return type:

dict[str, Any]

Parameters:
class scpn_fusion.control.realtime_efit.RealtimeEFIT(diagnostics, R_grid, Z_grid, n_p_modes=3, n_ff_modes=3)[source]

Bases: object

Real-time equilibrium reconstruction with an EFIT-compatible basis.

Parameters:
__init__(diagnostics, R_grid, Z_grid, n_p_modes=3, n_ff_modes=3)[source]

Validate the grids and diagnostics and build the forward response model.

Parameters:
Return type:

None

reconstruct(measurements)[source]

Run the EFIT reconstruction loop and return the equilibrium result.

Return type:

ReconstructionResult

Parameters:

measurements (dict[str, Any])

find_lcfs(psi)[source]

Return ordered (R,Z) points on the last closed flux surface.

Return type:

ndarray[Any, dtype[float64]]

Parameters:

psi (ndarray[Any, dtype[float64]])

find_xpoint(psi)[source]

Locate magnetic nulls (dpsi/dR=0, dpsi/dZ=0).

Return type:

tuple[float, float] | None

Parameters:

psi (ndarray[Any, dtype[float64]])

compute_shape_params(psi)[source]

Extract macroscopic shape descriptors (R0, a, kappa, delta, q95, li) from psi.

Return type:

ShapeParams

Parameters:

psi (ndarray[Any, dtype[float64]])

Plasma Shape Controller

Plasma shape-control targets, Jacobians, and Tikhonov feedback laws.

This module hosts deterministic abstractions for feed-forward shape targets, numerical Jacobian construction, and regularized controller synthesis used by shape-control smoke tests.

class scpn_fusion.control.shape_controller.ShapeTarget(isoflux_points, gap_points, gap_targets, xpoint_target=None, strike_point_targets=None)[source]

Bases: object

Desired plasma boundary: isoflux points, gaps, X-point, strike points.

Parameters:
isoflux_points: list[tuple[float, float]]
gap_points: list[tuple[float, float, float, float]]
gap_targets: list[float]
xpoint_target: tuple[float, float] | None = None
strike_point_targets: list[tuple[float, float]] | None = None
class scpn_fusion.control.shape_controller.ShapeControlResult(isoflux_error, gap_errors, min_gap, xpoint_error, strike_point_errors)[source]

Bases: object

Shape control performance: isoflux, gap, X-point, and strike-point errors.

Parameters:
isoflux_error: float
gap_errors: ndarray[Any, dtype[float64]]
min_gap: float
xpoint_error: float
strike_point_errors: ndarray[Any, dtype[float64]]
class scpn_fusion.control.shape_controller.CoilSet(n_coils=10)[source]

Bases: object

PF coil geometry and current limits.

Parameters:

n_coils (int)

__init__(n_coils=10)[source]

Initialize an idealized PF coil set with symmetric current limits.

Parameters:

n_coils (int)

class scpn_fusion.control.shape_controller.ShapeJacobian(kernel, coil_set, target)[source]

Bases: object

d(e_shape)/dI_coils sensitivity matrix with deterministic geometry basis.

Parameters:
__init__(kernel, coil_set, target)[source]

Initialize the shape-error sensitivity model for a target and coil set.

Parameters:
compute()[source]

Return the shape Jacobian matrix d(e_shape)/dI_coils.

Return type:

ndarray[Any, dtype[float64]]

update(state)[source]

Re-linearize Jacobian around a new operating point.

Supported state paths: :rtype: None

  • jacobian: explicit updated matrix with shape (n_errors, n_coils).

  • Baseline multiplicative scaling via plasma_current_ma, beta_p, coil_coupling, and error_coupling.

Parameters:

state (dict[str, Any])

Return type:

None

class scpn_fusion.control.shape_controller.PlasmaShapeController(target, coil_set, kernel)[source]

Bases: object

Real-time shape controller using Tikhonov-regularized pseudoinverse.

Parameters:
__init__(target, coil_set, kernel)[source]

Initialize target weighting, Jacobian state, and regularized shape gain.

Parameters:
step(psi, coil_currents)[source]

Compute coil current changes to correct shape errors.

Return type:

ndarray[Any, dtype[float64]]

Parameters:
evaluate_performance(psi)[source]

Decompose the shape error vector into per-category metrics.

Return type:

ShapeControlResult

Parameters:

psi (ndarray[Any, dtype[float64]])

scpn_fusion.control.shape_controller.iter_lower_single_null_target()[source]

Return an ITER-like lower-single-null plasma boundary target.

Return type:

ShapeTarget

Vertical Stabiliser (Sliding Mode)

Super-twisting sliding-mode control utilities for vertical stabilisation.

class scpn_fusion.control.sliding_mode_vertical.SuperTwistingSMC(alpha, beta, c, u_max)[source]

Bases: object

Second-order sliding mode controller (super-twisting algorithm).

Parameters:
sliding_surface(e, de_dt)[source]

S = e + c * de/dt.

Return type:

float

Parameters:
step(e, de_dt, dt)[source]

U = -alpha * abs(s)^0.5 * sign(s) + v, v_dot = -beta * sign(s).

Return type:

float

Parameters:
class scpn_fusion.control.sliding_mode_vertical.VerticalStabilizer(n_index, Ip_MA, R0, m_eff, tau_wall, smc)[source]

Bases: object

Vertical stability controller wrapper for a tokamak.

Parameters:
step(Z_meas, Z_ref, dZ_dt_meas, dt)[source]

Compute vertical stabilization command from position error and velocity.

Return type:

float

Parameters:
scpn_fusion.control.sliding_mode_vertical.lyapunov_certificate(alpha, beta, L_max)[source]

Verify gain conditions: alpha > sqrt(2 L_max), beta > L_max.

Return type:

bool

Parameters:
scpn_fusion.control.sliding_mode_vertical.estimate_convergence_time(alpha, beta, L_max, s0)[source]

Upper bound on time to reach s=0.

Return type:

float

Parameters:

Fault-Tolerant Control

Fault detection, isolation, injection, and reconfigurable control utilities.

class scpn_fusion.control.fault_tolerant_control.FaultType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Actuator and sensor fault modes for FDI classification.

STUCK_ACTUATOR = 1
OPEN_CIRCUIT_ACTUATOR = 2
SENSOR_DROPOUT = 3
SENSOR_DRIFT = 4
SENSOR_NOISE_INCREASE = 5
class scpn_fusion.control.fault_tolerant_control.FaultReport(component_index, is_sensor, fault_type, confidence, time_detected)[source]

Bases: object

Detected fault: which component, fault mode, confidence, and detection time.

Parameters:
component_index: int
is_sensor: bool
fault_type: FaultType
confidence: float
time_detected: float
class scpn_fusion.control.fault_tolerant_control.FDIMonitor(n_sensors, n_actuators, threshold_sigma=3.0, n_alert=5)[source]

Bases: object

Fault Detection and Isolation based on innovation monitoring.

Parameters:
  • n_sensors (int)

  • n_actuators (int)

  • threshold_sigma (float)

  • n_alert (int)

update(y_measured, y_predicted, t)[source]

Compare measurements to predictions; flag sensors exceeding threshold_sigma for n_alert consecutive steps.

Return type:

list[FaultReport]

Parameters:
class scpn_fusion.control.fault_tolerant_control.ReconfigurableController(base_controller, jacobian, n_coils, n_sensors)[source]

Bases: object

Adjusts control allocation based on detected faults.

Parameters:
  • base_controller (Any)

  • jacobian (FloatArray)

  • n_coils (int)

  • n_sensors (int)

handle_actuator_fault(coil_index, fault_type, stuck_val=0.0)[source]

Zero out the faulted coil column in J and recompute gain.

Return type:

None

Parameters:
handle_sensor_fault(sensor_index, fault_type)[source]

Accommodate sensor faults by reducing/removing affected measurement rows.

This controller uses a weighted least-squares allocation with W; sensor faults are represented by adapting row weights that participate in the control solve.

Return type:

None

Parameters:
step(error, dt)[source]

Compute coil current corrections, compensating stuck-actuator offsets.

Return type:

ndarray[Any, dtype[float64]]

Parameters:
controllability_check()[source]

Return True if enough healthy coils remain for minimum-rank controllability.

Return type:

bool

graceful_shutdown()[source]

Return zero-current command vector for safe ramp-down.

Return type:

ndarray[Any, dtype[float64]]

class scpn_fusion.control.fault_tolerant_control.FaultInjector(fault_time, component_index, fault_type, severity=1.0)[source]

Bases: object

Injects sensor faults (dropout or drift) at a specified time for testing.

Parameters:
inject(t, signals)[source]

Return a copied signal vector with the configured fault applied after fault_time.

Return type:

ndarray[Any, dtype[float64]]

Parameters:

Safe RL Controller

Constrained reinforcement-learning wrappers and tokamak safety costs.

The module provides a tiny constrained-RL contract used by production smoke surfaces and safety gating examples. The API is intentionally simple and deterministic to keep auditability and reproducibility in tests.

class scpn_fusion.control.safe_rl_controller.SafetyConstraint(name, cost_fn, limit)[source]

Bases: object

Named constraint: cost_fn(obs, act, next_obs) must stay below limit.

Parameters:
name: str
cost_fn: Callable[[ndarray[Any, dtype[float64]], ndarray[Any, dtype[float64]], ndarray[Any, dtype[float64]]], float]
limit: float
class scpn_fusion.control.safe_rl_controller.ConstrainedGymTokamakEnv(base_env, constraints)[source]

Bases: object

Wrapper to compute and report constraint costs.

Parameters:
__init__(base_env, constraints)[source]

Wrap a Gym-like environment with named safety constraints.

Parameters:
reset(**kwargs)[source]

Reset base environment and cache initial observation.

Return type:

tuple[ndarray[Any, dtype[float64]], dict[str, Any]]

Parameters:

kwargs (Any)

step(action)[source]

Step base env and append constraint costs to info dict.

Return type:

tuple[ndarray[Any, dtype[float64]], float, bool, bool, dict[str, Any]]

Parameters:

action (ndarray[Any, dtype[float64]])

class scpn_fusion.control.safe_rl_controller.LagrangianPPO(env, lambda_lr=0.01, gamma=0.99)[source]

Bases: object

PPO augmented with Lagrangian multipliers for constrained RL.

Parameters:
__init__(env, lambda_lr=0.01, gamma=0.99)[source]

Initialize constrained-policy state and dual update hyperparameters.

Parameters:
update_lambdas(episode_costs)[source]

Dual gradient ascent: lambda_i <- max(0, lambda_i + lr*(C_i - d_i)).

Return type:

None

Parameters:

episode_costs (list[float])

train(total_timesteps)[source]

Mock training loop — collects rollouts and updates lambdas.

Return type:

None

Parameters:

total_timesteps (int)

predict(obs)[source]

Return action for given observation. Currently samples randomly.

Return type:

ndarray[Any, dtype[float64]]

Parameters:

obs (ndarray[Any, dtype[float64]])

scpn_fusion.control.safe_rl_controller.q95_cost_fn(obs, act, next_obs)[source]

Compute a lower-bound violation cost on edge safety factor q95.

Parameters:
  • obs (ndarray[Any, dtype[float64]]) – Previous observation; unused in this contract.

  • act (ndarray[Any, dtype[float64]]) – Action taken; unused in this contract.

  • next_obs (ndarray[Any, dtype[float64]]) – Post-step observation, where index 2 is assumed to hold q95.

Return type:

float

Returns:

Positive violation amount in the same unit as q95 delta.

scpn_fusion.control.safe_rl_controller.beta_n_cost_fn(obs, act, next_obs)[source]

Compute an upper-bound violation cost on normalized beta beta_N.

Parameters:
  • obs (ndarray[Any, dtype[float64]]) – Previous observation; unused in this contract.

  • act (ndarray[Any, dtype[float64]]) – Action taken; unused in this contract.

  • next_obs (ndarray[Any, dtype[float64]]) – Post-step observation, where index 1 is assumed to hold beta_N.

Return type:

float

Returns:

Positive cost when beta_N exceeds 3.5.

scpn_fusion.control.safe_rl_controller.ip_cost_fn(obs, act, next_obs)[source]

Compute a lower-bound violation cost on plasma current.

Parameters:
  • obs (ndarray[Any, dtype[float64]]) – Previous observation; unused in this contract.

  • act (ndarray[Any, dtype[float64]]) – Action taken; unused in this contract.

  • next_obs (ndarray[Any, dtype[float64]]) – Post-step observation, where index 0 is assumed to hold Ip.

Return type:

float

Returns:

Positive violation value when Ip is non-positive.

scpn_fusion.control.safe_rl_controller.default_safety_constraints()[source]

Return the default q95, beta_N, and plasma-current constraints.

Return type:

list[SafetyConstraint]

Returns:

A list of default SafetyConstraint instances with zero limits.

Scenario Scheduler

Feedforward scenario schedules and waveform factories for control studies.

The routines in this module produce deterministic reference waveforms for feedforward controllers and lightweight offline optimisation sweeps.

class scpn_fusion.control.scenario_scheduler.ScenarioWaveform(name, times, values, interp_kind='linear')[source]

Bases: object

Piecewise-linear time-series for a single actuator channel.

Parameters:
name: str
times: ndarray[Any, dtype[float64]]
values: ndarray[Any, dtype[float64]]
interp_kind: str = 'linear'
class scpn_fusion.control.scenario_scheduler.ScenarioSchedule(waveforms)[source]

Bases: object

Collection of named waveforms defining a tokamak discharge scenario.

Parameters:

waveforms (dict[str, ScenarioWaveform])

__init__(waveforms)[source]

Initialize the schedule from actuator or reference-state waveforms.

Parameters:

waveforms (dict[str, ScenarioWaveform])

evaluate(t)[source]

Interpolate all waveforms at time t.

Return type:

dict[str, float]

Parameters:

t (float)

duration()[source]

Maximum endpoint across all waveforms [s].

Return type:

float

validate()[source]

Check monotonic time vectors and physical sign constraints.

Return type:

list[str]

class scpn_fusion.control.scenario_scheduler.FeedforwardController(schedule, feedback)[source]

Bases: object

Combines pre-computed feedforward trajectories with a feedback trim.

Parameters:
__init__(schedule, feedback)[source]

Bind a scenario schedule to a feedback correction callback.

Parameters:
step(x, t, dt)[source]

U = u_ff(t) + u_fb(x_err).

Return type:

ndarray[Any, dtype[float64]]

Parameters:
class scpn_fusion.control.scenario_scheduler.ScenarioOptimizer(plant_model, target_state, T_total, dt=0.5)[source]

Bases: object

Offline trajectory design via Nelder-Mead.

Parameters:
  • plant_model (PlantModelFn)

  • target_state (FloatArray)

  • T_total (float)

  • dt (float)

__init__(plant_model, target_state, T_total, dt=0.5)[source]

Initialize an offline waveform optimizer for a target terminal state.

Parameters:
Return type:

None

optimize(n_iter=100)[source]

Minimize state-tracking cost over T_total via Nelder-Mead on waveform knots.

Return type:

ScenarioSchedule

Parameters:

n_iter (int)

scpn_fusion.control.scenario_scheduler.iter_15ma_baseline()[source]

Return the ITER-like 15 MA baseline feedforward schedule.

Return type:

ScenarioSchedule

Returns:

ScenarioSchedule: Time/actuator profile covering ramp-up, flat-top, and current ramp-down phases.

scpn_fusion.control.scenario_scheduler.nstx_u_1ma_standard()[source]

Return the NSTX-U-like 1 MA standard feedforward schedule.

Return type:

ScenarioSchedule

Returns:

ScenarioSchedule: Compact pulse with actuator commands matched to a short disrpution-free 1 MA operating window.

Gain-Scheduled Controller

Gain-scheduled controllers and baseline discharge schedules.

class scpn_fusion.control.gain_scheduled_controller.OperatingRegime(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Discharge phase: ramp-up, L-mode, L-H transition, H-mode, ramp-down, disruption.

RAMP_UP = 1
L_MODE_FLAT = 2
LH_TRANSITION = 3
H_MODE_FLAT = 4
RAMP_DOWN = 5
DISRUPTION_MITIGATION = 6
class scpn_fusion.control.gain_scheduled_controller.RegimeController(regime, Kp, Ki, Kd, x_ref, constraints)[source]

Bases: object

PID gains and setpoint for one operating regime.

Parameters:
regime: OperatingRegime
Kp: ndarray[Any, dtype[float64]]
Ki: ndarray[Any, dtype[float64]]
Kd: ndarray[Any, dtype[float64]]
x_ref: ndarray[Any, dtype[float64]]
constraints: dict[str, Any]
class scpn_fusion.control.gain_scheduled_controller.RegimeDetector(thresholds=None)[source]

Bases: object

Hysteretic detector for tokamak operating regimes.

Parameters:

thresholds (dict[str, float] | None)

detect(state, dstate_dt, tau_E, p_disrupt)[source]

Classify regime from dIp/dt, confinement time, and disruption probability. Uses hysteresis buffer.

Return type:

OperatingRegime

Parameters:
class scpn_fusion.control.gain_scheduled_controller.GainScheduledController(controllers)[source]

Bases: object

PID controller bank with bumpless interpolation across regimes.

Parameters:

controllers (dict[OperatingRegime, RegimeController])

step(x, t, dt, detected_regime)[source]

PID step with bumpless gain interpolation during regime transitions.

Return type:

ndarray[Any, dtype[float64]]

Parameters:
class scpn_fusion.control.gain_scheduled_controller.ScenarioWaveform(name, times, values, interp_kind='linear')[source]

Bases: object

Piecewise-linear time-series for a single actuator channel.

Parameters:
  • name (str)

  • times (FloatArray)

  • values (FloatArray)

  • interp_kind (str)

class scpn_fusion.control.gain_scheduled_controller.ScenarioSchedule(waveforms)[source]

Bases: object

Collection of named waveforms defining a tokamak discharge scenario.

Parameters:

waveforms (dict[str, ScenarioWaveform])

evaluate(t)[source]

Interpolate all waveforms at time t.

Return type:

dict[str, float]

Parameters:

t (float)

duration()[source]

Maximum endpoint across all waveforms [s].

Return type:

float

validate()[source]

Check monotonic time vectors.

Return type:

list[str]

scpn_fusion.control.gain_scheduled_controller.iter_baseline_schedule()[source]

Return a reduced ITER-like baseline discharge schedule.

Return type:

ScenarioSchedule