Source code for scpn_fusion.core.current_diffusion

# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Current Diffusion Equation
"""One-dimensional current-diffusion and safety-factor profile evolution tools."""

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from typing import TypeAlias, cast

import numpy as np
from numpy.typing import NDArray
from scipy.linalg import solve_banded

# Sauter, Angioni & Lin-Liu (1999/2002) neoclassical resistivity
MU_0 = 4.0 * np.pi * 1e-7
FloatArray: TypeAlias = NDArray[np.float64]
ProfileCallable: TypeAlias = Callable[[float, FloatArray], FloatArray | float]
EtaCallable: TypeAlias = Callable[[FloatArray], FloatArray | float]
ScalarCallable: TypeAlias = Callable[[float], float]
TauCallable: TypeAlias = Callable[..., FloatArray | float]


[docs] @dataclass(frozen=True) class FluxEvolutionTrajectory: """Time history for the non-adiabatic MIF/FRC flux carrier equation. Arrays use SI units. ``psi`` has shape ``(n_steps + 1, n_rho)`` and stores the evolved poloidal-flux profile. ``hall_drive`` stores ``R_null E_theta``, ``resistive_loss`` stores ``eta J_theta``, and ``damping_rate`` stores ``1 / tau_psi`` at the same time/grid points. ``source_increment`` and ``damping_decrement`` have shape ``(n_steps, n_rho)`` and close the exact discrete balance ``psi[n+1] = psi[n] - damping_decrement[n] + source_increment[n]``. """ time_s: FloatArray rho: FloatArray psi: FloatArray hall_drive: FloatArray resistive_loss: FloatArray source: FloatArray damping_rate: FloatArray source_increment: FloatArray damping_decrement: FloatArray update_residual: FloatArray dt_s: float
[docs] def solve_flux_evolution_nonadiabatic( rho: FloatArray, psi0: FloatArray, *, tau_psi_fn: TauCallable, R_null_t: ScalarCallable, E_theta_t: ProfileCallable, eta_spitzer_fn: EtaCallable, J_theta_t: ProfileCallable, dt: float, n_steps: int, ) -> FluxEvolutionTrajectory: """Evolve the pulsed non-adiabatic flux constraint. The implemented carrier is the Ono-style flattened FRC flux equation ``dpsi/dt = -psi / tau_psi + R_null E_theta - eta J_theta``. Each time step samples the source at both endpoints and advances the local linear damping term analytically using the midpoint damping rate. This is exact for constant damping/source, second-order for smooth time-dependent drive terms, and reduces exactly to constant ``psi`` when Hall drive, resistive loss, and damping are all zero. """ rho_grid = _validate_radial_grid(rho) psi_initial = _as_profile(psi0, rho_grid.size, "psi0") if not np.isfinite(dt) or dt <= 0.0: raise ValueError("dt must be positive") if n_steps <= 0: raise ValueError("n_steps must be positive") time_s = np.linspace(0.0, dt * n_steps, n_steps + 1, dtype=np.float64) psi = np.zeros((n_steps + 1, rho_grid.size), dtype=np.float64) hall_drive = np.zeros_like(psi) resistive_loss = np.zeros_like(psi) source = np.zeros_like(psi) damping_rate = np.zeros_like(psi) source_increment = np.zeros((n_steps, rho_grid.size), dtype=np.float64) damping_decrement = np.zeros_like(source_increment) update_residual = np.zeros_like(source_increment) psi[0] = psi_initial for step_index, time_value in enumerate(time_s): tau_profile = _as_profile( _call_tau(tau_psi_fn, time_value, rho_grid), rho_grid.size, "tau", allow_infinite=True, ) invalid_tau = np.isneginf(tau_profile) | ((tau_profile <= 0.0) & ~np.isposinf(tau_profile)) if np.any(invalid_tau): raise ValueError("tau_psi_fn must return positive finite values or np.inf") damping_rate[step_index] = np.where(np.isinf(tau_profile), 0.0, 1.0 / tau_profile) r_null = float(R_null_t(float(time_value))) if not np.isfinite(r_null) or r_null < 0.0: raise ValueError("R_null_t must return a finite non-negative radius") e_theta = _as_profile(E_theta_t(float(time_value), rho_grid), rho_grid.size, "E_theta_t") eta = _as_profile(eta_spitzer_fn(rho_grid), rho_grid.size, "eta_spitzer_fn") if np.any(eta < 0.0): raise ValueError("eta_spitzer_fn must return non-negative resistivity") j_theta = _as_profile(J_theta_t(float(time_value), rho_grid), rho_grid.size, "J_theta_t") hall_drive[step_index] = r_null * e_theta resistive_loss[step_index] = eta * j_theta source[step_index] = hall_drive[step_index] - resistive_loss[step_index] for step_index in range(n_steps): gamma = 0.5 * (damping_rate[step_index] + damping_rate[step_index + 1]) source_midpoint = 0.5 * (source[step_index] + source[step_index + 1]) decay = np.exp(-gamma * dt) driven_increment = np.empty(rho_grid.size, dtype=np.float64) damped = gamma > 0.0 driven_increment[damped] = source_midpoint[damped] * (1.0 - decay[damped]) / gamma[damped] driven_increment[~damped] = dt * source_midpoint[~damped] damping_drop = psi[step_index] * (1.0 - decay) expected_next = psi[step_index] - damping_drop + driven_increment psi[step_index + 1] = expected_next source_increment[step_index] = driven_increment damping_decrement[step_index] = damping_drop update_residual[step_index] = psi[step_index + 1] - expected_next return FluxEvolutionTrajectory( time_s=time_s, rho=rho_grid, psi=psi, hall_drive=hall_drive, resistive_loss=resistive_loss, source=source, damping_rate=damping_rate, source_increment=source_increment, damping_decrement=damping_decrement, update_residual=update_residual, dt_s=float(dt), )
[docs] def neoclassical_resistivity( Te_keV: float, ne_19: float, Z_eff: float, epsilon: float, q: float = 1.0, R0: float = 2.0 ) -> float: """Sauter neoclassical parallel resistivity [Ohm-m].""" Te_keV = max(Te_keV, 1e-3) ne_19 = max(ne_19, 1e-3) epsilon = max(epsilon, 1e-6) ln_Lambda = 17.0 eta_Spitzer = 1.65e-9 * Z_eff * ln_Lambda / (Te_keV**1.5) # Trapped fraction — Sauter 2002 Eq. 14 f_t = 1.0 - (1.0 - epsilon) ** 2 / (np.sqrt(1.0 - epsilon**2) * (1.0 + 1.46 * np.sqrt(epsilon))) f_t = max(0.0, min(f_t, 1.0)) e_charge = 1.602e-19 m_e = 9.109e-31 v_te = np.sqrt(2.0 * Te_keV * 1e3 * e_charge / m_e) nu_ei = ( (ne_19 * 1e19) * Z_eff * e_charge**4 * ln_Lambda / (12.0 * np.pi**1.5 * (8.854e-12) ** 2 * np.sqrt(m_e) * (Te_keV * 1e3 * e_charge) ** 1.5) ) # Sauter 2002 Eq. 13 — electron collisionality (needed for banana/plateau regime selection) nu_star_e = nu_ei * max(q, 0.5) * R0 / (epsilon**1.5 * v_te) # noqa: F841 # Neoclassical correction — Sauter 2002 Eq. 15 C_R = 1.0 - (1.0 + 0.36 / Z_eff) * f_t + (0.59 / Z_eff) * f_t**2 eta_neo = eta_Spitzer / (1.0 - f_t) * C_R return float(max(eta_neo, eta_Spitzer))
[docs] def q_from_psi(rho: FloatArray, psi: FloatArray, R0: float, a: float, B0: float) -> FloatArray: """q(rho) = -rho a^2 B0 / (R0 dpsi/drho), L'Hopital at axis.""" nr = len(rho) q = np.zeros(nr) drho = rho[1] - rho[0] dpsi_drho = np.gradient(psi, drho) for i in range(1, nr): denom = R0 * dpsi_drho[i] if abs(denom) < 1e-12: q[i] = q[i - 1] if i > 1 else 1.0 else: q[i] = -rho[i] * a**2 * B0 / denom # L'Hopital at rho=0: q(0) = -a^2 B0 / (R0 d^2psi/drho^2) d2psi = (psi[2] - 2 * psi[1] + psi[0]) / (drho**2) if abs(d2psi) > 1e-12: q[0] = -(a**2) * B0 / (R0 * d2psi) else: q[0] = q[1] return np.abs(q)
[docs] def resistive_diffusion_time(a: float, eta: float) -> float: """tau_R = mu0 a^2 / eta [seconds].""" return MU_0 * a**2 / max(eta, 1e-12)
def _validate_radial_grid(rho: FloatArray) -> FloatArray: rho_grid = cast(FloatArray, np.asarray(rho, dtype=np.float64)) if rho_grid.ndim != 1: raise ValueError("rho must be one-dimensional") if rho_grid.size < 2: raise ValueError("rho must contain at least two points") if not np.all(np.isfinite(rho_grid)): raise ValueError("rho must contain finite values") if not np.all(np.diff(rho_grid) > 0.0): raise ValueError("rho must be strictly increasing") return rho_grid def _as_profile( values: FloatArray | float, expected_size: int, name: str, *, allow_infinite: bool = False, ) -> FloatArray: profile = np.asarray(values, dtype=np.float64) if profile.ndim == 0: profile = np.full(expected_size, float(profile), dtype=np.float64) if profile.shape != (expected_size,): raise ValueError(f"{name} must be scalar or shape ({expected_size},)") valid_values = ( np.isfinite(profile) | np.isposinf(profile) if allow_infinite else np.isfinite(profile) ) if not np.all(valid_values): suffix = "finite values or np.inf" if allow_infinite else "finite values" raise ValueError(f"{name} must contain {suffix}") return profile def _call_tau(tau_psi_fn: TauCallable, time_s: float, rho: FloatArray) -> FloatArray | float: try: return tau_psi_fn(time_s, rho) except TypeError: return tau_psi_fn(time_s)
[docs] class CurrentDiffusionSolver: """Implicit Crank-Nicolson solver for 1D poloidal flux diffusion.""" def __init__(self, rho: FloatArray, R0: float, a: float, B0: float): self.rho = rho self.R0 = R0 self.a = a self.B0 = B0 self.nr = len(rho) self.drho = rho[1] - rho[0] self.psi = np.zeros(self.nr) for i in range(1, self.nr): r = self.rho[i] q_r = 1.0 + 2.0 * r**2 dpsi = -r * self.a**2 * self.B0 / (self.R0 * q_r) self.psi[i] = self.psi[i - 1] + dpsi * self.drho self.psi -= self.psi[-1]
[docs] def step( self, dt: float, Te: FloatArray, ne: FloatArray, Z_eff: float, j_bs: FloatArray, j_cd: FloatArray, j_ext: FloatArray | None = None, ) -> FloatArray: """Advance poloidal flux psi by one timestep dt.""" if j_ext is None: j_ext = np.zeros(self.nr) j_tot_source = j_bs + j_cd + j_ext q_prof = q_from_psi(self.rho, self.psi, self.R0, self.a, self.B0) eta_prof = np.zeros(self.nr) for i in range(self.nr): epsilon = self.rho[i] * self.a / self.R0 eta_prof[i] = neoclassical_resistivity( float(Te[i]), float(ne[i]), Z_eff, epsilon, float(q_prof[i]), self.R0, ) D = eta_prof / (MU_0 * self.a**2) alpha = dt / 2.0 drho2 = self.drho**2 sub = np.zeros(self.nr) diag = np.zeros(self.nr) sup = np.zeros(self.nr) rhs = np.zeros(self.nr) # Axis BC: L(psi)_0 = 4 D_0 (psi_1 - psi_0) / drho^2 diag[0] = 1.0 + alpha * 4.0 * D[0] / drho2 sup[0] = -alpha * 4.0 * D[0] / drho2 rhs[0] = ( self.psi[0] + alpha * 4.0 * D[0] * (self.psi[1] - self.psi[0]) / drho2 + dt * self.R0 * eta_prof[0] * j_tot_source[0] ) for i in range(1, self.nr - 1): r = self.rho[i] coeff_prev = D[i] * (1.0 / drho2 - 1.0 / (2.0 * r * self.drho)) coeff_curr = D[i] * (-2.0 / drho2) coeff_next = D[i] * (1.0 / drho2 + 1.0 / (2.0 * r * self.drho)) sub[i] = -alpha * coeff_prev diag[i] = 1.0 - alpha * coeff_curr sup[i] = -alpha * coeff_next L_psi_n = ( coeff_prev * self.psi[i - 1] + coeff_curr * self.psi[i] + coeff_next * self.psi[i + 1] ) rhs[i] = self.psi[i] + alpha * L_psi_n + dt * self.R0 * eta_prof[i] * j_tot_source[i] # Edge Dirichlet BC diag[-1] = 1.0 sub[-1] = 0.0 rhs[-1] = self.psi[-1] ab = np.zeros((3, self.nr)) ab[0, 1:] = sup[:-1] ab[1, :] = diag ab[2, :-1] = sub[1:] self.psi = cast(FloatArray, solve_banded((1, 1), ab, rhs)) return self.psi