scpn_fusion.control – Control

The control subpackage contains reactor control algorithms, digital twin infrastructure, disruption prediction, and mitigation systems.

Tokamak Flight Simulator

class scpn_fusion.control.tokamak_flight_sim.FirstOrderActuator(*, tau_s, dt_s, u_min=-1000000000.0, u_max=1000000000.0, rate_limit=1000000.0, sensor_noise_std=0.0, delay_steps=0, rng_seed=None)[source]

Bases: object

Discrete first-order actuator with rate limits, noise, and delay.

Models a realistic coil power supply for tokamak control: - First-order lag: u_applied(s) = 1/(tau*s+1) * u_cmd - Coil current rate limit: abs(du/dt) <= rate_limit [A/s] - Sensor noise: additive Gaussian on measurement - Measurement delay: pure transport delay on feedback signal

Parameters:
  • tau_s (float) – Actuator time constant [s].

  • dt_s (float) – Simulation timestep [s].

  • u_min (float) – Saturation limits.

  • u_max (float) – Saturation limits.

  • rate_limit (float) – Maximum current rate of change [A/s]. Default 1e6 (1 MA/s, ITER PF spec).

  • sensor_noise_std (float) – Standard deviation of additive sensor noise. Default 0.0 (disabled).

  • delay_steps (int) – Number of timesteps of measurement delay. Default 0.

  • rng_seed (int or None) – Random seed for reproducible noise (None = random).

step(command)[source]

Apply command through actuator dynamics with rate limiting.

Return type:

float

Parameters:

command (float)

get_measurement()[source]

Return delayed, noisy measurement of actuator output.

Return type:

float

class scpn_fusion.control.tokamak_flight_sim.IsoFluxController(config_file, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05)[source]

Bases: object

Simulates the Plasma Control System (PCS). Uses PID loops to adjust Coil Currents to maintain plasma shape.

Parameters:
  • config_file (str)

  • kernel_factory (Callable[[str], Any])

  • verbose (bool)

  • actuator_tau_s (float)

  • heating_actuator_tau_s (Optional[float])

  • actuator_current_delta_limit (float)

  • heating_beta_max (float)

  • control_dt_s (float)

pid_step(pid, error)[source]
Return type:

float

Parameters:
run_shot(shot_duration=30, save_plot=True, output_path='Tokamak_Flight_Report.png')[source]

Run a simulated tokamak shot.

Parameters:
  • shot_duration (int) – Number of simulation steps. Default 30.

  • save_plot (bool) – Whether to generate a summary plot.

  • output_path (str) – Filename for the plot.

Return type:

Dict[str, Any]

visualize_flight(output_path='Tokamak_Flight_Report.png')[source]
Return type:

Tuple[bool, Optional[str]]

Parameters:

output_path (str)

scpn_fusion.control.tokamak_flight_sim.run_flight_sim(config_file=None, shot_duration=50, seed=42, save_plot=True, output_path='Tokamak_Flight_Report.png', verbose=True, actuator_tau_s=0.06, heating_actuator_tau_s=None, actuator_current_delta_limit=1000000000.0, heating_beta_max=5.0, control_dt_s=0.05, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>)[source]

Run deterministic tokamak flight-sim control loop and return summary.

Return type:

Dict[str, Any]

Parameters:

Tokamak Digital Twin

class scpn_fusion.control.tokamak_digital_twin.TokamakTopoloy(size=40)[source]

Bases: object

Magnetic geometry: q-profile and island evolution via Modified Rutherford Equation.

step_island_evolution(dt=0.1)[source]

Evolve island widths via MRE with neoclassical bootstrap drive.

update_q_profile(current_drive_action)[source]

Update parabolic q(r) = q0 + (qa-q0)*r^2 with current drive modulation.

get_rational_surfaces()[source]

Boolean map of rational-surface islands from current MRE widths.

class scpn_fusion.control.tokamak_digital_twin.Plasma2D(topology, gyro_surrogate=None)[source]

Bases: object

2D diffusion-reaction model on a poloidal cross-section.

step(action)[source]

Evolve plasma one timestep with current drive action in [-1, 1].

class scpn_fusion.control.tokamak_digital_twin.SimpleNeuralNet(input_size, hidden_size, output_size, *, rng)[source]

Bases: object

NumPy MLP policy network for continuous control.

Parameters:
forward(x)[source]
train_step(x, target_action, advantage)[source]

REINFORCE-like policy gradient step.

scpn_fusion.control.tokamak_digital_twin.run_digital_twin(time_steps=10000, seed=42, save_plot=True, output_path='Tokamak_Digital_Twin.png', verbose=True, gyro_surrogate=None, chaos_monkey=False, sensor_dropout_prob=0.0, sensor_noise_std=0.0, rng=None)[source]

Run digital-twin control simulation, return summary dict.

Parameters:
scpn_fusion.control.tokamak_digital_twin.run_digital_twin_ids(*, machine='ITER', shot=0, run=0, **kwargs)[source]

Run digital twin and return IDS-like equilibrium payload.

Parameters:
scpn_fusion.control.tokamak_digital_twin.run_digital_twin_ids_history(history_steps, *, machine='ITER', shot=0, run=0, seed=42, **kwargs)[source]

Run digital twin at multiple horizons and return IDS-like payload sequence.

Parameters:
scpn_fusion.control.tokamak_digital_twin.run_digital_twin_ids_pulse(history_steps, *, machine='ITER', shot=0, run=0, seed=42, **kwargs)[source]

Run digital twin at multiple horizons and return pulse-style IDS container.

Parameters:

Digital Twin Ingest

Realtime digital-twin ingestion hook with SNN scenario planning.

class scpn_fusion.control.digital_twin_ingest.TelemetryPacket(t_ms, machine, ip_ma, beta_n, q95, density_1e19)[source]

Bases: object

Parameters:
t_ms: int
machine: str
ip_ma: float
beta_n: float
q95: float
density_1e19: float
scpn_fusion.control.digital_twin_ingest.generate_emulated_stream(machine, *, samples=320, dt_ms=5, seed=42)[source]
Return type:

list[TelemetryPacket]

Parameters:
class scpn_fusion.control.digital_twin_ingest.RealtimeTwinHook(machine, *, max_buffer=512, seed=42)[source]

Bases: object

In-memory realtime ingest + SNN planning hook.

Parameters:
ingest(packet)[source]
Return type:

None

Parameters:

packet (TelemetryPacket)

scenario_plan(*, horizon=24)[source]
Return type:

dict[str, float | bool]

Parameters:

horizon (int)

scpn_fusion.control.digital_twin_ingest.run_realtime_twin_session(machine, *, seed=42, samples=320, dt_ms=5, horizon=24, plan_every=8, max_buffer=512, chaos_dropout_prob=0.0, chaos_noise_std=0.0)[source]

Run deterministic digital-twin ingest+planning session and return summary.

Return type:

dict[str, Any]

Parameters:

Traceable Runtime (JAX/TorchScript)

Optional JAX-traceable control-loop utilities with NumPy fallback.

class scpn_fusion.control.jax_traceable_runtime.TraceableRuntimeSpec(dt_s=0.001, tau_s=0.005, gain=1.0, command_limit=1.0)[source]

Bases: object

Configuration for reduced traceable first-order actuator dynamics.

Parameters:
dt_s: float = 0.001
tau_s: float = 0.005
gain: float = 1.0
command_limit: float = 1.0
class scpn_fusion.control.jax_traceable_runtime.TraceableRuntimeResult(state_history, backend_used, compiled)[source]

Bases: object

Result of a traceable control-loop rollout.

Parameters:
state_history: ndarray[Any, dtype[float64]]
backend_used: str
compiled: bool
class scpn_fusion.control.jax_traceable_runtime.TraceableRuntimeBatchResult(state_history, backend_used, compiled)[source]

Bases: object

Result of batched traceable control-loop rollout.

Parameters:
state_history: ndarray[Any, dtype[float64]]
backend_used: str
compiled: bool
class scpn_fusion.control.jax_traceable_runtime.TraceableBackendParityReport(backend, single_max_abs_err, batch_max_abs_err, single_within_tol, batch_within_tol)[source]

Bases: object

Parity metrics against NumPy reference backend.

Parameters:
  • backend (str)

  • single_max_abs_err (float)

  • batch_max_abs_err (float)

  • single_within_tol (bool)

  • batch_within_tol (bool)

backend: str
single_max_abs_err: float
batch_max_abs_err: float
single_within_tol: bool
batch_within_tol: bool
scpn_fusion.control.jax_traceable_runtime.available_traceable_backends()[source]

Return available runtime backends on this machine.

Return type:

list[str]

scpn_fusion.control.jax_traceable_runtime.run_traceable_control_loop(commands, *, initial_state=0.0, spec=None, backend='auto')[source]

Run a reduced control loop suitable for optional JAX tracing/JIT.

backend can be auto, numpy, jax, or torchscript.

Return type:

TraceableRuntimeResult

Parameters:
scpn_fusion.control.jax_traceable_runtime.run_traceable_control_batch(commands, *, initial_state=None, spec=None, backend='auto')[source]

Run batched reduced control loops with optional JAX/TorchScript backends.

commands shape: (batch, steps)

Return type:

TraceableRuntimeBatchResult

Parameters:
scpn_fusion.control.jax_traceable_runtime.validate_traceable_backend_parity(*, steps=64, batch=8, seed=42, spec=None, atol=1e-08, backends=None)[source]

Compare available compiled backends to NumPy for single and batch rollouts.

Return type:

dict[str, TraceableBackendParityReport]

Parameters:

Model-Predictive Control (Optimal)

class scpn_fusion.control.fusion_optimal_control.OptimalController(config_file, *, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True, correction_limit=5.0, coil_current_limits=(-40.0, 40.0), current_target_limits=(5.0, 16.0))[source]

Bases: object

MIMO controller using response-matrix inversion with bounded actuation.

Parameters:
  • config_file (str)

  • kernel_factory (Callable[[str], Any])

  • verbose (bool)

  • correction_limit (float)

  • coil_current_limits (Tuple[float, float])

  • current_target_limits (Tuple[float, float])

identify_system(perturbation=0.5)[source]

Perturb each coil and measure plasma-axis response to build Jacobian.

Return type:

None

Parameters:

perturbation (float)

get_shafranov_shift()[source]

Calculates the Shafranov Shift (Delta R) heuristic. Delta R ~ (a^2 / 2R) * (beta_p + li/2)

Return type:

float

get_plasma_pos()[source]

Return current magnetic-axis position [R, Z]. Harden with Shafranov Shift correction for high-beta states.

Return type:

ndarray

compute_optimal_correction(current_pos, target_pos, regularization_lambda=0.05, *, regularization_limit=None)[source]

Solve Error = J * Delta_I using Tikhonov-regularized (damped) SVD. Provides smoother control than hard-cutoff SVD near singularities.

Return type:

ndarray

Parameters:
run_optimal_shot(shot_steps=50, target_r=6.0, target_z=0.0, gain=0.8, ip_start_ma=10.0, ip_span_ma=5.0, identify_first=False, save_plot=True, output_path='Optimal_Control_Result.png')[source]
Return type:

Dict[str, Any]

Parameters:
plot_telemetry(output_path='Optimal_Control_Result.png')[source]
Return type:

Tuple[bool, Optional[str]]

Parameters:

output_path (str)

scpn_fusion.control.fusion_optimal_control.run_optimal_control(config_file=None, shot_steps=50, target_r=6.0, target_z=0.0, seed=42, save_plot=True, output_path='Optimal_Control_Result.png', verbose=True, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, coil_current_limits=(-40.0, 40.0), current_target_limits=(5.0, 16.0))[source]

Run bounded optimal-control shot and return deterministic summary.

Return type:

Dict[str, Any]

Parameters:

State-of-the-Art MPC

class scpn_fusion.control.fusion_sota_mpc.NeuralSurrogate(n_coils, n_state, verbose=True)[source]

Bases: object

Linearized surrogate model around current operating point.

Parameters:
train_on_kernel(kernel, perturbation=1.0)[source]
Return type:

None

Parameters:
get_state(kernel)[source]
Return type:

ndarray

Parameters:

kernel (Any)

predict(current_state, action_delta)[source]
Return type:

ndarray

Parameters:
class scpn_fusion.control.fusion_sota_mpc.ModelPredictiveController(surrogate, target_state, *, prediction_horizon=10, learning_rate=0.5, iterations=20, action_limit=2.0, action_regularization=0.1)[source]

Bases: object

Gradient-based MPC planner over surrogate dynamics.

Parameters:
plan_trajectory(current_state)[source]
Return type:

ndarray

Parameters:

current_state (ndarray)

scpn_fusion.control.fusion_sota_mpc.run_sota_simulation(config_file=None, shot_length=100, prediction_horizon=10, target_vector=None, disturbance_start_step=20, disturbance_per_step_ma=0.1, current_target_bounds=(5.0, 16.0), action_limit=2.0, coil_current_limits=(-40.0, 40.0), save_plot=True, output_path='SOTA_MPC_Results.png', verbose=True, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>)[source]
Return type:

Dict[str, Any]

Parameters:

Disruption Predictor

class scpn_fusion.control.disruption_predictor.HybridAnomalyDetector(threshold=0.5, ema=0.05)[source]

Bases: object

Hybrid detector combining learned risk and deterministic smoothing.

score(signal, toroidal_observables=None)[source]
Return type:

dict[str, float | bool]

scpn_fusion.control.disruption_predictor.apply_bit_flip_fault(value, bit_index)[source]

Inject a deterministic single-bit fault into a float.

scpn_fusion.control.disruption_predictor.apply_disruption_logit_bias(risk, bias_delta)[source]

Apply additive logit-space calibration bias to a bounded risk score.

Return type:

float

Parameters:
scpn_fusion.control.disruption_predictor.build_disruption_feature_vector(signal, toroidal_observables=None)[source]

Build a compact feature vector for control-oriented disruption scoring.

Feature layout:
[mean, std, max, slope, energy, last,

toroidal_n1_amp, toroidal_n2_amp, toroidal_n3_amp, toroidal_asymmetry_index, toroidal_radial_spread]

scpn_fusion.control.disruption_predictor.predict_disruption_risk(signal, toroidal_observables=None, bias_delta=0.0)[source]

Lightweight deterministic disruption risk estimator (0..1) for control loops.

This supplements the Transformer pathway by explicitly consuming toroidal asymmetry observables from 3D diagnostics.

Parameters:

bias_delta (float)

scpn_fusion.control.disruption_predictor.run_anomaly_alarm_campaign(*, seed=0, episodes=128, window=64, threshold=0.5)[source]

Evaluate alarm true/false-positive behavior on deterministic synthetic episodes.

Return type:

dict[str, float | int | bool]

Parameters:
scpn_fusion.control.disruption_predictor.run_fault_noise_campaign(*, seed=0, episodes=64, window=64, noise_std=0.02, bit_flip_interval=7, recovery_window=4, recovery_epsilon=0.05)[source]

Run deterministic robustness campaign with injected noise and bit flips.

Return type:

dict[str, float | int | bool]

Parameters:
  • seed (int)

  • episodes (int)

  • window (int)

  • noise_std (float)

  • bit_flip_interval (int)

  • recovery_window (int)

  • recovery_epsilon (float)

class scpn_fusion.control.disruption_predictor.DisruptionTransformer(*args, **kwargs)[source]

Bases: Module

forward(src)[source]
scpn_fusion.control.disruption_predictor.train_predictor(seq_len=100, n_shots=500, epochs=50, model_path=None, seed=42, save_plot=True)[source]
scpn_fusion.control.disruption_predictor.load_or_train_predictor(model_path=None, seq_len=100, force_retrain=False, train_kwargs=None, train_if_missing=True, allow_fallback=True)[source]
scpn_fusion.control.disruption_predictor.predict_disruption_risk_safe(signal, toroidal_observables=None, *, model_path=None, seq_len=100, train_if_missing=False, allow_fallback=True)[source]

Predict disruption risk with checkpoint path if available, else deterministic compatibility estimator.

Return type:

tuple[float, dict[str, Any]]

Returns:

  • risk, metadatarisk is always a bounded float in [0, 1]. metadata includes whether compatibility mode was used.

  • allow_fallback – If False, this API raises on missing/broken checkpoints or inference failures instead of returning compatibility risk from predict_disruption_risk.

Parameters:

allow_fallback (bool)

scpn_fusion.control.disruption_predictor.evaluate_predictor(model, X_test, y_test, times_test=None, threshold=0.5)[source]

Evaluate disruption predictor on test set.

Returns dict with accuracy, precision, recall, F1, confusion matrix, and recall@T for T in [10, 20, 30, 50, 100] ms.

Return type:

dict[str, Any]

Parameters:

Shattered Pellet Injection

class scpn_fusion.control.spi_mitigation.ShatteredPelletInjection(Plasma_Energy_MJ=300.0, Plasma_Current_MA=15.0)[source]

Bases: object

Reduced SPI mitigation model for thermal/current quench campaigns.

Parameters:
  • Plasma_Energy_MJ (float)

  • Plasma_Current_MA (float)

static estimate_z_eff(neon_quantity_mol)[source]
Return type:

float

Parameters:

neon_quantity_mol (float)

static estimate_z_eff_cocktail(*, neon_quantity_mol=0.0, argon_quantity_mol=0.0, xenon_quantity_mol=0.0)[source]
Return type:

float

Parameters:
  • neon_quantity_mol (float)

  • argon_quantity_mol (float)

  • xenon_quantity_mol (float)

static estimate_mitigation_cocktail(*, risk_score, disturbance, action_bias=0.0)[source]
Return type:

dict[str, float]

Parameters:
static estimate_tau_cq(te_keV, z_eff)[source]
Return type:

float

Parameters:
trigger_mitigation(neon_quantity_mol=0.1, argon_quantity_mol=0.0, xenon_quantity_mol=0.0, return_diagnostics=False, *, duration_s=0.05, dt_s=1e-05, verbose=True)[source]
Parameters:
scpn_fusion.control.spi_mitigation.run_spi_mitigation(*, plasma_energy_mj=300.0, plasma_current_ma=15.0, neon_quantity_mol=0.1, argon_quantity_mol=0.0, xenon_quantity_mol=0.0, duration_s=0.05, dt_s=1e-05, save_plot=True, output_path='SPI_Mitigation_Result.png', verbose=True)[source]

Run SPI mitigation simulation and return deterministic summary metrics.

Return type:

dict[str, Any]

Parameters:
scpn_fusion.control.spi_mitigation.run_spi_test()[source]
Return type:

dict[str, Any]

Integrated Control Room

class scpn_fusion.control.fusion_control_room.TokamakPhysicsEngine(size=60, *, seed=42, kernel=None)[source]

Bases: object

Reduced Grad-Shafranov geometry model with optional kernel-Psi ingestion.

Parameters:
  • size (int)

  • seed (int)

  • kernel (Optional[Any])

solve_flux_surfaces()[source]

Return (density, psi) from kernel state when available, otherwise from analytic Miller-parameterized geometry.

Return type:

tuple[ndarray, ndarray]

step_dynamics(coil_action_top, coil_action_bottom)[source]

Reduced vertical-displacement dynamics.

Return type:

float

Parameters:
  • coil_action_top (float)

  • coil_action_bottom (float)

class scpn_fusion.control.fusion_control_room.DiagnosticSystem(rng)[source]

Bases: object

Noisy vertical-position probe.

Parameters:

rng (np.random.Generator)

measure_position(true_z)[source]
Return type:

float

Parameters:

true_z (float)

class scpn_fusion.control.fusion_control_room.KalmanObserver(dt=0.1)[source]

Bases: object

Linear Kalman Filter for robust plasma position state estimation. Harden state estimation against sensor noise and dropout.

Parameters:

dt (float)

update(measured_z, dropout=False)[source]

Predict-correct cycle. Returns filtered Z-position.

Return type:

float

Parameters:
class scpn_fusion.control.fusion_control_room.NeuralController(dt=0.1)[source]

Bases: object

Hardened PID control policy for vertical stabilization.

Parameters:

dt (float)

compute_action(measured_z)[source]
Return type:

tuple[float, float]

Parameters:

measured_z (float)

scpn_fusion.control.fusion_control_room.run_control_room(sim_duration=200, *, seed=42, save_animation=True, save_report=True, output_gif='SCPN_Fusion_Control_Room.gif', output_report='SCPN_Fusion_Status_Report.png', verbose=True, kernel_factory=None, config_file=None, allow_kernel_fallback=True)[source]

Run the control-room loop and return deterministic summary metrics.

Return type:

dict[str, Any]

Parameters:

Neuro-Cybernetic Controller

class scpn_fusion.control.neuro_cybernetic_controller.SpikingControllerPool(n_neurons=20, gain=1.0, tau_window=10, use_quantum=False, *, seed=42, allow_numpy_fallback=True, dt_s=0.001, tau_mem_s=0.015, noise_std=0.02)[source]

Bases: object

Push-pull spiking control population with deterministic compatibility path.

Preferred backend is sc-neurocore. If unavailable, a reduced NumPy LIF population is used so controller workflows remain executable in CI.

Parameters:
step(error_signal)[source]
Return type:

float

Parameters:

error_signal (float)

class scpn_fusion.control.neuro_cybernetic_controller.NeuroCyberneticController(config_file, seed=42, *, shot_duration=100, kernel_factory=None)[source]

Bases: object

Replaces PID loops with push-pull spiking populations.

Parameters:
  • config_file (str)

  • seed (int)

  • shot_duration (int)

  • kernel_factory (Optional[Callable[[str], Any]])

initialize_brains(use_quantum=False)[source]
Return type:

None

Parameters:

use_quantum (bool)

run_shot(*, save_plot=True, verbose=True, output_path=None)[source]
Return type:

Dict[str, Any]

Parameters:
  • save_plot (bool)

  • verbose (bool)

  • output_path (str | None)

run_quantum_shot(*, save_plot=True, verbose=True, output_path=None)[source]
Return type:

Dict[str, Any]

Parameters:
  • save_plot (bool)

  • verbose (bool)

  • output_path (str | None)

visualize(title, *, output_path=None, verbose=True)[source]
Return type:

str

Parameters:
  • title (str)

  • output_path (str | None)

  • verbose (bool)

scpn_fusion.control.neuro_cybernetic_controller.run_neuro_cybernetic_control(*, config_file, shot_duration=100, seed=42, quantum=False, save_plot=False, verbose=False, output_path=None, kernel_factory=None)[source]

Run neuro-cybernetic control in deterministic non-interactive mode.

Return type:

Dict[str, Any]

Parameters:

SOC Fusion Learning

class scpn_fusion.control.advanced_soc_fusion_learning.CoupledSandpileReactor(size=60, *, z_crit_base=6.0, flow_generation=0.2, flow_damping=0.05, shear_efficiency=3.0, max_sub_steps=50, flow_bounds=(0.0, 5.0), energy_per_topple_mj=0.05)[source]

Bases: object

Predator-prey sandpile approximation for turbulence/flow coupling.

Parameters:
drive(amount=1.0)[source]
Return type:

None

Parameters:

amount (float)

step_physics(external_shear)[source]
Return type:

tuple[int, float, float]

Parameters:

external_shear (float)

get_profile_energy()[source]
Return type:

float

get_elm_energy_mj(topple_count)[source]

Calculate real energy release from topple count.

Return type:

float

Parameters:

topple_count (int)

class scpn_fusion.control.advanced_soc_fusion_learning.FusionAIAgent(*, alpha=0.1, gamma=0.95, epsilon=0.1, n_states_turb=5, n_states_flow=5, n_actions=3, entropy_beta=0.05)[source]

Bases: object

Tabular Q-learning controller on discretized turbulence/flow states.

Parameters:
discretize_state(turb, flow)[source]
Return type:

tuple[int, int]

Parameters:
choose_action(state, rng)[source]
Return type:

int

Parameters:
learn(state, action, new_state, reward)[source]

Soft-Q learning update with entropy regularization. Q(s,a) = R + gamma * [ max Q(s’,a’) + beta * Entropy ]

Return type:

float

Parameters:
class scpn_fusion.control.advanced_soc_fusion_learning.FusionAI_Agent(*, alpha=0.1, gamma=0.95, epsilon=0.1, n_states_turb=5, n_states_flow=5, n_actions=3, entropy_beta=0.05)[source]

Bases: FusionAIAgent

Backward-compatible alias for older scripts.

Parameters:
scpn_fusion.control.advanced_soc_fusion_learning.run_advanced_learning_sim(size=60, time_steps=10000, seed=42, *, epsilon=0.1, noise_probability=0.01, shear_step=0.05, shear_bounds=(0.0, 1.0), save_plot=True, output_path='Advanced_SOC_Learning.png', verbose=True)[source]

Run deterministic SOC+Q-learning control simulation and return summary metrics.

Return type:

Dict[str, Any]

Parameters:

Analytic Solver

class scpn_fusion.control.analytic_solver.AnalyticEquilibriumSolver(config_path, *, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>, verbose=True)[source]

Bases: object

Analytic vertical-field target and least-norm coil-current solve.

Parameters:
  • config_path (str)

  • kernel_factory (Callable[[str], Any])

  • verbose (bool)

calculate_required_Bv(R_geo, a_min, Ip_MA, *, beta_p=0.5, li=0.8)[source]

Shafranov radial-force balance vertical field estimate.

Return type:

float

Parameters:
compute_coil_efficiencies(target_R, *, target_Z=0.0)[source]

Compute dBz/dI per coil at target location using kernel vacuum-field map.

Return type:

ndarray

Parameters:
solve_coil_currents(target_Bv, target_R, *, target_Z=0.0, ridge_lambda=0.0)[source]

Solve least-norm coil currents for desired vertical field target.

Return type:

ndarray

Parameters:
apply_currents(currents)[source]
Return type:

None

Parameters:

currents (ndarray)

apply_and_save(currents, output_path=None)[source]
Return type:

str

Parameters:
scpn_fusion.control.analytic_solver.run_analytic_solver(config_path=None, *, target_r=6.2, target_z=0.0, a_minor=2.0, ip_target_ma=15.0, beta_p=0.5, li=0.8, ridge_lambda=0.0, save_config=True, output_config_path=None, allow_validation_fallback=True, verbose=True, kernel_factory=<class 'scpn_fusion.core.fusion_kernel.FusionKernel'>)[source]

Run analytic equilibrium solve and return deterministic summary.

Return type:

Dict[str, Any]

Parameters:

Director Interface

class scpn_fusion.control.director_interface.DirectorInterface(config_path, *, allow_fallback=True, director=None, controller_factory=<class 'scpn_fusion.control.neuro_cybernetic_controller.NeuroCyberneticController'>, entropy_threshold=0.3, history_window=10)[source]

Bases: object

Interfaces the ‘Director’ (Layer 16: Coherence Oversight) with the Fusion Reactor.

Role: The Director does NOT control the coils (Layer 2 does that). The Director controls the Controller. It sets the strategy and monitors for “Backfire”.

Mechanism: 1. Sample System State (Physics + Neural Activity). 2. Format as a “Prompt” for the Director. 3. Director calculates Entropy/Risk. 4. If Safe: Director updates Target Parameters. 5. If Unsafe: Director triggers corrective action.

Parameters:
  • config_path (str)

  • allow_fallback (bool)

  • director (Optional[Any])

  • controller_factory (Callable[[str], Any])

  • entropy_threshold (float)

  • history_window (int)

format_state_for_director(t, ip, err_r, err_z, brain_activity)[source]

Translate physical telemetry into a semantic prompt for the Director.

Return type:

str

Parameters:
run_directed_mission(duration=100, *, use_quantum=True, glitch_start_step=50, glitch_std=500.0, rng_seed=42, save_plot=True, output_path='Director_Interface_Result.png', verbose=True)[source]
Return type:

dict[str, Any]

Parameters:
visualize(output_path='Director_Interface_Result.png')[source]
Return type:

str

Parameters:

output_path (str)

Fueling Mode Controller

Ice-pellet fueling mode via Petri-to-SNN control path.

class scpn_fusion.control.fueling_mode.FuelingSimResult(final_density, final_abs_error, rmse, steps, dt_s, history_density, history_command)[source]

Bases: object

Parameters:
final_density: float
final_abs_error: float
rmse: float
steps: int
dt_s: float
history_density: list[float]
history_command: list[float]
class scpn_fusion.control.fueling_mode.IcePelletFuelingController(target_density=1.0)[source]

Bases: object

Hybrid Petri-to-SNN + PI fueling controller.

Parameters:

target_density (float)

step(density, k, dt_s)[source]
Return type:

tuple[float, float]

Parameters:
scpn_fusion.control.fueling_mode.simulate_iter_density_control(*, target_density=1.0, initial_density=0.82, steps=3000, dt_s=0.001)[source]
Return type:

FuelingSimResult

Parameters:
scpn_fusion.control.fueling_mode.run_fueling_mode(*, target_density=1.0, initial_density=0.82, steps=3000, dt_s=0.001)[source]

Run deterministic fueling simulation and return summary metrics.

Return type:

dict[str, Any]

Parameters:

TORAX Hybrid Loop

Synthetic TORAX-hybrid realtime control lane for NSTX-U-like scenarios.

class scpn_fusion.control.torax_hybrid_loop.ToraxPlasmaState(beta_n, q95, li3, w_thermal_mj)[source]

Bases: object

Parameters:
beta_n: float
q95: float
li3: float
w_thermal_mj: float
class scpn_fusion.control.torax_hybrid_loop.ToraxHybridCampaignResult(episodes, steps_per_episode, disruption_avoidance_rate, torax_parity_pct, p95_loop_latency_ms, mean_risk, passes_thresholds)[source]

Bases: object

Parameters:
  • episodes (int)

  • steps_per_episode (int)

  • disruption_avoidance_rate (float)

  • torax_parity_pct (float)

  • p95_loop_latency_ms (float)

  • mean_risk (float)

  • passes_thresholds (bool)

episodes: int
steps_per_episode: int
disruption_avoidance_rate: float
torax_parity_pct: float
p95_loop_latency_ms: float
mean_risk: float
passes_thresholds: bool
scpn_fusion.control.torax_hybrid_loop.run_nstxu_torax_hybrid_campaign(*, seed=42, episodes=16, steps_per_episode=220)[source]

Run deterministic NSTX-U-like realtime hybrid control campaign.

Return type:

ToraxHybridCampaignResult

Parameters:
  • seed (int)

  • episodes (int)

  • steps_per_episode (int)

Real-Time EFIT

class scpn_fusion.control.realtime_efit.MagneticDiagnostics(flux_loops, b_probes, rogowski_radius)[source]

Bases: object

Layout of magnetic sensors.

Parameters:
flux_loops: list[tuple[float, float]]
b_probes: list[tuple[float, float, str]]
rogowski_radius: float
class scpn_fusion.control.realtime_efit.ShapeParams(R0, a, kappa, delta_upper, delta_lower, q95, beta_pol, li, Ip_reconstructed)[source]

Bases: object

Reconstructed macroscopic parameters.

Parameters:
R0: float
a: float
kappa: float
delta_upper: float
delta_lower: float
q95: float
beta_pol: float
li: float
Ip_reconstructed: float
class scpn_fusion.control.realtime_efit.ReconstructionResult(psi, p_prime_coeffs, ff_prime_coeffs, shape, chi_squared, n_iterations, wall_time_ms)[source]

Bases: object

EFIT reconstruction output: psi field, source coefficients, and shape.

Parameters:
psi: ndarray
p_prime_coeffs: ndarray
ff_prime_coeffs: ndarray
shape: ShapeParams
chi_squared: float
n_iterations: int
wall_time_ms: float
class scpn_fusion.control.realtime_efit.DiagnosticResponse(diagnostics, R_grid, Z_grid)[source]

Bases: object

Forward model: psi field → synthetic flux-loop and B-probe signals.

Parameters:
simulate_measurements(psi, coil_currents)[source]

Generate synthetic measurements from a given psi field.

Return type:

dict[str, Any]

Parameters:
class scpn_fusion.control.realtime_efit.RealtimeEFIT(diagnostics, R_grid, Z_grid, n_p_modes=3, n_ff_modes=3)[source]

Bases: object

Simplified real-time equilibrium reconstruction (EFIT).

Parameters:
reconstruct(measurements)[source]

Main EFIT loop.

Return type:

ReconstructionResult

Parameters:

measurements (dict[str, Any])

find_lcfs(psi)[source]

Return (R,Z) points on the last closed flux surface. Stub: returns zeros.

Return type:

ndarray

Parameters:

psi (ndarray)

find_xpoint(psi)[source]

Locate magnetic nulls (dpsi/dR=0, dpsi/dZ=0).

Return type:

tuple[float, float] | None

Parameters:

psi (ndarray)

compute_shape_params(psi)[source]

Extract macroscopic shape descriptors (R0, a, kappa, delta, q95, li) from psi.

Return type:

ShapeParams

Parameters:

psi (ndarray)

Plasma Shape Controller

class scpn_fusion.control.shape_controller.ShapeTarget(isoflux_points, gap_points, gap_targets, xpoint_target=None, strike_point_targets=None)[source]

Bases: object

Desired plasma boundary: isoflux points, gaps, X-point, strike points.

Parameters:
isoflux_points: list[tuple[float, float]]
gap_points: list[tuple[float, float, float, float]]
gap_targets: list[float]
xpoint_target: tuple[float, float] | None = None
strike_point_targets: list[tuple[float, float]] | None = None
class scpn_fusion.control.shape_controller.ShapeControlResult(isoflux_error, gap_errors, min_gap, xpoint_error, strike_point_errors)[source]

Bases: object

Shape control performance: isoflux, gap, X-point, and strike-point errors.

Parameters:
isoflux_error: float
gap_errors: ndarray
min_gap: float
xpoint_error: float
strike_point_errors: ndarray
class scpn_fusion.control.shape_controller.CoilSet(n_coils=10)[source]

Bases: object

PF coil geometry and current limits.

Parameters:

n_coils (int)

class scpn_fusion.control.shape_controller.ShapeJacobian(kernel, coil_set, target)[source]

Bases: object

d(e_shape)/dI_coils via GS perturbation (mock for testing).

Parameters:
compute()[source]

Return the shape Jacobian matrix d(e_shape)/dI_coils.

Return type:

ndarray

update(state)[source]

Re-linearize the Jacobian around a new operating point (stub).

Return type:

None

Parameters:

state (dict[str, Any])

class scpn_fusion.control.shape_controller.PlasmaShapeController(target, coil_set, kernel)[source]

Bases: object

Real-time shape controller using Tikhonov-regularized pseudoinverse.

Parameters:
step(psi, coil_currents)[source]

Compute coil current changes to correct shape errors.

Return type:

ndarray

Parameters:
evaluate_performance(psi)[source]

Decompose the shape error vector into per-category metrics.

Return type:

ShapeControlResult

Parameters:

psi (ndarray)

scpn_fusion.control.shape_controller.iter_lower_single_null_target()[source]
Return type:

ShapeTarget

Vertical Stabiliser (Sliding Mode)

class scpn_fusion.control.sliding_mode_vertical.SuperTwistingSMC(alpha, beta, c, u_max)[source]

Bases: object

Second-order sliding mode controller (super-twisting algorithm).

Parameters:
sliding_surface(e, de_dt)[source]

s = e + c * de/dt.

Return type:

float

Parameters:
step(e, de_dt, dt)[source]

u = -alpha * abs(s)^0.5 * sign(s) + v, v_dot = -beta * sign(s).

Return type:

float

Parameters:
class scpn_fusion.control.sliding_mode_vertical.VerticalStabilizer(n_index, Ip_MA, R0, m_eff, tau_wall, smc)[source]

Bases: object

Vertical stability controller wrapper for a tokamak.

Parameters:
step(Z_meas, Z_ref, dZ_dt_meas, dt)[source]

Compute vertical stabilization command from position error and velocity.

Return type:

float

Parameters:
scpn_fusion.control.sliding_mode_vertical.lyapunov_certificate(alpha, beta, L_max)[source]

Verify gain conditions: alpha > sqrt(2 L_max), beta > L_max.

Return type:

bool

Parameters:
scpn_fusion.control.sliding_mode_vertical.estimate_convergence_time(alpha, beta, L_max, s0)[source]

Upper bound on time to reach s=0.

Return type:

float

Parameters:

Fault-Tolerant Control

class scpn_fusion.control.fault_tolerant_control.FaultType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Actuator and sensor fault modes for FDI classification.

STUCK_ACTUATOR = 1
OPEN_CIRCUIT_ACTUATOR = 2
SENSOR_DROPOUT = 3
SENSOR_DRIFT = 4
SENSOR_NOISE_INCREASE = 5
class scpn_fusion.control.fault_tolerant_control.FaultReport(component_index, is_sensor, fault_type, confidence, time_detected)[source]

Bases: object

Detected fault: which component, fault mode, confidence, and detection time.

Parameters:
component_index: int
is_sensor: bool
fault_type: FaultType
confidence: float
time_detected: float
class scpn_fusion.control.fault_tolerant_control.FDIMonitor(n_sensors, n_actuators, threshold_sigma=3.0, n_alert=5)[source]

Bases: object

Fault Detection and Isolation based on innovation monitoring.

Parameters:
  • n_sensors (int)

  • n_actuators (int)

  • threshold_sigma (float)

  • n_alert (int)

update(y_measured, y_predicted, t)[source]

Compare measurements to predictions; flag sensors exceeding threshold_sigma for n_alert consecutive steps.

Return type:

list[FaultReport]

Parameters:
class scpn_fusion.control.fault_tolerant_control.ReconfigurableController(base_controller, jacobian, n_coils, n_sensors)[source]

Bases: object

Adjusts control allocation based on detected faults.

Parameters:
  • base_controller (Any)

  • jacobian (np.ndarray)

  • n_coils (int)

  • n_sensors (int)

handle_actuator_fault(coil_index, fault_type, stuck_val=0.0)[source]

Zero out the faulted coil column in J and recompute gain.

Return type:

None

Parameters:
handle_sensor_fault(sensor_index, fault_type)[source]

Placeholder for sensor fault accommodation (e.g. observer reconfiguration).

Return type:

None

Parameters:
step(error, dt)[source]

Compute coil current corrections, compensating stuck-actuator offsets.

Return type:

ndarray

Parameters:
controllability_check()[source]

True if enough healthy coils remain for minimum-rank controllability.

Return type:

bool

graceful_shutdown()[source]

Return zero-current command vector for safe ramp-down.

Return type:

ndarray

class scpn_fusion.control.fault_tolerant_control.FaultInjector(fault_time, component_index, fault_type, severity=1.0)[source]

Bases: object

Injects sensor faults (dropout or drift) at a specified time for testing.

Parameters:
inject(t, signals)[source]
Return type:

ndarray

Parameters:

Safe RL Controller

class scpn_fusion.control.safe_rl_controller.SafetyConstraint(name, cost_fn, limit)[source]

Bases: object

Named constraint: cost_fn(obs, act, next_obs) must stay below limit.

Parameters:
name: str
cost_fn: Callable[[ndarray, ndarray, ndarray], float]
limit: float
class scpn_fusion.control.safe_rl_controller.ConstrainedGymTokamakEnv(base_env, constraints)[source]

Bases: object

Wrapper to compute and report constraint costs.

Parameters:
reset(**kwargs)[source]

Reset base environment and cache initial observation.

Return type:

tuple[ndarray, dict[str, Any]]

Parameters:

kwargs (Any)

step(action)[source]

Step base env and append constraint costs to info dict.

Return type:

tuple[ndarray, float, bool, bool, dict[str, Any]]

Parameters:

action (ndarray)

class scpn_fusion.control.safe_rl_controller.LagrangianPPO(env, lambda_lr=0.01, gamma=0.99)[source]

Bases: object

PPO augmented with Lagrangian multipliers for constrained RL.

Parameters:
update_lambdas(episode_costs)[source]

Dual gradient ascent: lambda_i <- max(0, lambda_i + lr*(C_i - d_i)).

Return type:

None

Parameters:

episode_costs (list[float])

train(total_timesteps)[source]

Mock training loop — collects rollouts and updates lambdas.

Return type:

None

Parameters:

total_timesteps (int)

predict(obs)[source]

Return action for given observation. Currently samples randomly.

Return type:

ndarray

Parameters:

obs (ndarray)

scpn_fusion.control.safe_rl_controller.q95_cost_fn(obs, act, next_obs)[source]
Return type:

float

Parameters:
scpn_fusion.control.safe_rl_controller.beta_n_cost_fn(obs, act, next_obs)[source]
Return type:

float

Parameters:
scpn_fusion.control.safe_rl_controller.ip_cost_fn(obs, act, next_obs)[source]
Return type:

float

Parameters:
scpn_fusion.control.safe_rl_controller.default_safety_constraints()[source]
Return type:

list[SafetyConstraint]

Scenario Scheduler

class scpn_fusion.control.scenario_scheduler.ScenarioWaveform(name, times, values, interp_kind='linear')[source]

Bases: object

Piecewise-linear time-series for a single actuator channel.

Parameters:
name: str
times: ndarray
values: ndarray
interp_kind: str = 'linear'
class scpn_fusion.control.scenario_scheduler.ScenarioSchedule(waveforms)[source]

Bases: object

Collection of named waveforms defining a tokamak discharge scenario.

Parameters:

waveforms (dict[str, ScenarioWaveform])

evaluate(t)[source]

Interpolate all waveforms at time t.

Return type:

dict[str, float]

Parameters:

t (float)

duration()[source]

Maximum endpoint across all waveforms [s].

Return type:

float

validate()[source]

Check monotonic time vectors and physical sign constraints.

Return type:

list[str]

class scpn_fusion.control.scenario_scheduler.FeedforwardController(schedule, feedback)[source]

Bases: object

Combines pre-computed feedforward trajectories with a feedback trim.

Parameters:
step(x, t, dt)[source]

u = u_ff(t) + u_fb(x_err).

Return type:

ndarray

Parameters:
class scpn_fusion.control.scenario_scheduler.ScenarioOptimizer(plant_model, target_state, T_total, dt=0.5)[source]

Bases: object

Offline trajectory design via Nelder-Mead.

Parameters:
  • plant_model (Callable)

  • target_state (np.ndarray)

  • T_total (float)

  • dt (float)

optimize(n_iter=100)[source]

Minimize state-tracking cost over T_total via Nelder-Mead on waveform knots.

Return type:

ScenarioSchedule

Parameters:

n_iter (int)

scpn_fusion.control.scenario_scheduler.iter_15ma_baseline()[source]
Return type:

ScenarioSchedule

scpn_fusion.control.scenario_scheduler.nstx_u_1ma_standard()[source]
Return type:

ScenarioSchedule

Gain-Scheduled Controller

class scpn_fusion.control.gain_scheduled_controller.OperatingRegime(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Discharge phase: ramp-up, L-mode, L-H transition, H-mode, ramp-down, disruption.

RAMP_UP = 1
L_MODE_FLAT = 2
LH_TRANSITION = 3
H_MODE_FLAT = 4
RAMP_DOWN = 5
DISRUPTION_MITIGATION = 6
class scpn_fusion.control.gain_scheduled_controller.RegimeController(regime, Kp, Ki, Kd, x_ref, constraints)[source]

Bases: object

PID gains and setpoint for one operating regime.

Parameters:
regime: OperatingRegime
Kp: ndarray
Ki: ndarray
Kd: ndarray
x_ref: ndarray
constraints: dict[str, Any]
class scpn_fusion.control.gain_scheduled_controller.RegimeDetector(thresholds=None)[source]

Bases: object

Parameters:

thresholds (dict[str, float] | None)

detect(state, dstate_dt, tau_E, p_disrupt)[source]

Classify regime from dIp/dt, confinement time, and disruption probability. Uses hysteresis buffer.

Return type:

OperatingRegime

Parameters:
class scpn_fusion.control.gain_scheduled_controller.GainScheduledController(controllers)[source]

Bases: object

Parameters:

controllers (dict[OperatingRegime, RegimeController])

step(x, t, dt, detected_regime)[source]

PID step with bumpless gain interpolation during regime transitions.

Return type:

ndarray

Parameters:
class scpn_fusion.control.gain_scheduled_controller.ScenarioWaveform(name, times, values, interp_kind='linear')[source]

Bases: object

Piecewise-linear time-series for a single actuator channel.

Parameters:
  • name (str)

  • times (np.ndarray)

  • values (np.ndarray)

  • interp_kind (str)

class scpn_fusion.control.gain_scheduled_controller.ScenarioSchedule(waveforms)[source]

Bases: object

Collection of named waveforms defining a tokamak discharge scenario.

Parameters:

waveforms (dict[str, ScenarioWaveform])

evaluate(t)[source]

Interpolate all waveforms at time t.

Return type:

dict[str, float]

Parameters:

t (float)

duration()[source]

Maximum endpoint across all waveforms [s].

Return type:

float

validate()[source]

Check monotonic time vectors.

Return type:

list[str]

scpn_fusion.control.gain_scheduled_controller.iter_baseline_schedule()[source]
Return type:

ScenarioSchedule