Source code for scpn_fusion.diagnostics.forward

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Forward Diagnostics
"""Forward-model diagnostics for raw-observable comparison lanes."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Sequence

import numpy as np
from numpy.typing import NDArray

FloatArray = NDArray[np.float64]

ELECTRON_RADIUS_M = 2.8179403262e-15


[docs] @dataclass(frozen=True) class ForwardDiagnosticChannels: interferometer_phase_rad: FloatArray neutron_count_rate_hz: float thomson_scattering_voltage_v: FloatArray ece_temperature_kev: FloatArray | None = None sxr_brightness_w_m2: FloatArray | None = None bolometer_power_w_m2_sr: FloatArray | None = None cxrs_ti_kev: FloatArray | None = None cxrs_rotation_km_s: FloatArray | None = None
def _nearest_index(axis: FloatArray, value: float) -> int: idx = int(np.argmin(np.abs(axis - float(value)))) return int(np.clip(idx, 0, axis.size - 1)) def _validate_field_grid( field: FloatArray, r_grid: FloatArray, z_grid: FloatArray, *, name: str, ) -> tuple[FloatArray, FloatArray, FloatArray]: arr = np.asarray(field, dtype=np.float64) r = np.asarray(r_grid, dtype=np.float64).reshape(-1) z = np.asarray(z_grid, dtype=np.float64).reshape(-1) if r.ndim != 1 or z.ndim != 1 or r.size < 2 or z.size < 2: raise ValueError(f"{name}: r_grid and z_grid must be 1D with at least 2 points.") if arr.ndim != 2 or arr.shape != (z.size, r.size): raise ValueError( f"{name}: field shape {arr.shape} must match (len(z_grid), len(r_grid)) " f"= ({z.size}, {r.size})." ) if not (np.all(np.isfinite(arr)) and np.all(np.isfinite(r)) and np.all(np.isfinite(z))): raise ValueError(f"{name}: field and grids must be finite.") if not (np.all(np.diff(r) > 0.0) and np.all(np.diff(z) > 0.0)): raise ValueError(f"{name}: r_grid and z_grid must be strictly increasing.") return arr, r, z def _validate_chords( chords: Sequence[tuple[tuple[float, float], tuple[float, float]]], *, r_min: float | None = None, r_max: float | None = None, z_min: float | None = None, z_max: float | None = None, enforce_domain_bounds: bool = False, ) -> Sequence[tuple[tuple[float, float], tuple[float, float]]]: for i, chord in enumerate(chords): if len(chord) != 2: raise ValueError(f"chords[{i}] must be a 2-point chord.") start, end = chord if len(start) != 2 or len(end) != 2: raise ValueError(f"chords[{i}] points must be 2D (r, z) tuples.") vals = [start[0], start[1], end[0], end[1]] if not np.all(np.isfinite(np.asarray(vals, dtype=np.float64))): raise ValueError(f"chords[{i}] has non-finite coordinates.") if enforce_domain_bounds: if r_min is None or r_max is None or z_min is None or z_max is None: raise ValueError("Domain bounds must be provided when enforce_domain_bounds=True.") if ( float(start[0]) < float(r_min) or float(start[0]) > float(r_max) or float(end[0]) < float(r_min) or float(end[0]) > float(r_max) or float(start[1]) < float(z_min) or float(start[1]) > float(z_max) or float(end[1]) < float(z_min) or float(end[1]) > float(z_max) ): raise ValueError(f"chords[{i}] lies outside diagnostic domain bounds.") return chords def _validate_points( points: Sequence[tuple[float, float]], *, r_min: float | None = None, r_max: float | None = None, z_min: float | None = None, z_max: float | None = None, enforce_domain_bounds: bool = False, ) -> Sequence[tuple[float, float]]: if len(points) == 0: raise ValueError("sample_points must contain at least one point.") for i, point in enumerate(points): if len(point) != 2: raise ValueError(f"sample_points[{i}] must be a 2D (r, z) tuple.") r_point = float(point[0]) z_point = float(point[1]) if not np.isfinite(r_point) or not np.isfinite(z_point): raise ValueError(f"sample_points[{i}] has non-finite coordinates.") if enforce_domain_bounds: if r_min is None or r_max is None or z_min is None or z_max is None: raise ValueError("Domain bounds must be provided when enforce_domain_bounds=True.") if ( r_point < float(r_min) or r_point > float(r_max) or z_point < float(z_min) or z_point > float(z_max) ): raise ValueError(f"sample_points[{i}] lies outside diagnostic domain bounds.") return points def _line_integral_nearest( field: FloatArray, r_grid: FloatArray, z_grid: FloatArray, start: tuple[float, float], end: tuple[float, float], samples: int = 96, ) -> float: samples = int(samples) if samples < 8: raise ValueError("samples must be >= 8.") r_vals = np.linspace(float(start[0]), float(end[0]), samples) z_vals = np.linspace(float(start[1]), float(end[1]), samples) dl = float(np.hypot(end[0] - start[0], end[1] - start[1]) / samples) accum = 0.0 for r, z in zip(r_vals, z_vals): ir = _nearest_index(r_grid, float(r)) iz = _nearest_index(z_grid, float(z)) accum += float(field[iz, ir]) * dl return float(accum)
[docs] def interferometer_phase_shift( electron_density_m3: FloatArray, r_grid: FloatArray, z_grid: FloatArray, chords: Sequence[tuple[tuple[float, float], tuple[float, float]]], *, laser_wavelength_m: float = 1.064e-6, samples: int = 96, enforce_domain_bounds: bool = False, ) -> FloatArray: """Predict line-integrated interferometer phase shift [rad].""" wavelength = float(laser_wavelength_m) if not np.isfinite(wavelength) or wavelength <= 0.0: raise ValueError("laser_wavelength_m must be finite and > 0.") coeff = ELECTRON_RADIUS_M * wavelength ne, r, z = _validate_field_grid( np.asarray(electron_density_m3, dtype=np.float64), np.asarray(r_grid, dtype=np.float64), np.asarray(z_grid, dtype=np.float64), name="interferometer_phase_shift", ) _validate_chords( chords, r_min=float(np.min(r)), r_max=float(np.max(r)), z_min=float(np.min(z)), z_max=float(np.max(z)), enforce_domain_bounds=bool(enforce_domain_bounds), ) phases = np.zeros(len(chords), dtype=np.float64) for i, (start, end) in enumerate(chords): line_ne = _line_integral_nearest( ne, r, z, start, end, samples=samples, ) phases[i] = coeff * line_ne return phases
[docs] def neutron_count_rate( neutron_source_m3_s: FloatArray, *, volume_element_m3: float, detector_efficiency: float = 0.12, solid_angle_fraction: float = 1.0e-4, ) -> float: """Predict detector neutron count rate [Hz] from volumetric source.""" source = np.asarray(neutron_source_m3_s, dtype=np.float64) if source.size == 0 or source.ndim != 2: raise ValueError("neutron_source_m3_s must be a non-empty 2D array.") if not np.all(np.isfinite(source)): raise ValueError("neutron_source_m3_s must be finite.") vol = float(volume_element_m3) eff = float(detector_efficiency) omega = float(solid_angle_fraction) if not np.isfinite(vol) or vol <= 0.0: raise ValueError("volume_element_m3 must be finite and > 0.") if not np.isfinite(eff) or eff < 0.0 or eff > 1.0: raise ValueError("detector_efficiency must be finite and within [0, 1].") if not np.isfinite(omega) or omega < 0.0: raise ValueError("solid_angle_fraction must be finite and >= 0.") emitted_per_s = float(np.sum(np.clip(source, 0.0, None)) * vol) return float(emitted_per_s * eff * omega)
[docs] def thomson_scattering_voltage( electron_density_m3: FloatArray, electron_temp_keV: FloatArray, r_grid: FloatArray, z_grid: FloatArray, sample_points: Sequence[tuple[float, float]], *, gain_v_per_m3: float = 2.5e-24, temp_sensitivity_per_kev: float = 0.08, baseline_voltage_v: float = 0.0, enforce_domain_bounds: bool = False, ) -> FloatArray: """Predict Thomson-scattering detector voltage [V] at sample points.""" gain = float(gain_v_per_m3) temp_sensitivity = float(temp_sensitivity_per_kev) baseline = float(baseline_voltage_v) if not np.isfinite(gain) or gain <= 0.0: raise ValueError("gain_v_per_m3 must be finite and > 0.") if not np.isfinite(temp_sensitivity) or temp_sensitivity < 0.0: raise ValueError("temp_sensitivity_per_kev must be finite and >= 0.") if not np.isfinite(baseline): raise ValueError("baseline_voltage_v must be finite.") ne, r, z = _validate_field_grid( np.asarray(electron_density_m3, dtype=np.float64), np.asarray(r_grid, dtype=np.float64), np.asarray(z_grid, dtype=np.float64), name="thomson_scattering_voltage.electron_density_m3", ) te, _, _ = _validate_field_grid( np.asarray(electron_temp_keV, dtype=np.float64), r, z, name="thomson_scattering_voltage.electron_temp_keV", ) _validate_points( sample_points, r_min=float(np.min(r)), r_max=float(np.max(r)), z_min=float(np.min(z)), z_max=float(np.max(z)), enforce_domain_bounds=bool(enforce_domain_bounds), ) out = np.zeros(len(sample_points), dtype=np.float64) for i, point in enumerate(sample_points): ir = _nearest_index(r, float(point[0])) iz = _nearest_index(z, float(point[1])) ne_local = max(float(ne[iz, ir]), 0.0) te_local = max(float(te[iz, ir]), 0.0) out[i] = gain * ne_local * (1.0 + temp_sensitivity * te_local) + baseline if not np.all(np.isfinite(out)): raise ValueError("thomson_scattering_voltage produced non-finite values.") return out
[docs] def generate_forward_channels( *, electron_density_m3: FloatArray, neutron_source_m3_s: FloatArray, r_grid: FloatArray, z_grid: FloatArray, interferometer_chords: Sequence[tuple[tuple[float, float], tuple[float, float]]], volume_element_m3: float, detector_efficiency: float = 0.12, solid_angle_fraction: float = 1.0e-4, laser_wavelength_m: float = 1.064e-6, enforce_chord_domain_bounds: bool = False, electron_temp_keV: FloatArray | None = None, thomson_sample_points: Sequence[tuple[float, float]] | None = None, thomson_gain_v_per_m3: float = 2.5e-24, thomson_temp_sensitivity_per_kev: float = 0.08, thomson_baseline_voltage_v: float = 0.0, enforce_thomson_domain_bounds: bool = False, ) -> ForwardDiagnosticChannels: """Generate synthetic raw diagnostic channels from plasma state maps.""" ne = np.asarray(electron_density_m3, dtype=np.float64) r = np.asarray(r_grid, dtype=np.float64) z = np.asarray(z_grid, dtype=np.float64) temp_map = ( np.asarray(electron_temp_keV, dtype=np.float64) if electron_temp_keV is not None else np.clip(ne / 1.0e19, 0.0, None) ) if thomson_sample_points is None: r_min = float(np.min(r)) r_max = float(np.max(r)) z_mid = 0.5 * (float(np.min(z)) + float(np.max(z))) span = r_max - r_min thomson_sample_points = ( (r_min + 0.25 * span, z_mid), (r_min + 0.50 * span, z_mid), (r_min + 0.75 * span, z_mid), ) phases = interferometer_phase_shift( ne, r, z, interferometer_chords, laser_wavelength_m=laser_wavelength_m, enforce_domain_bounds=bool(enforce_chord_domain_bounds), ) rate = neutron_count_rate( np.asarray(neutron_source_m3_s, dtype=np.float64), volume_element_m3=volume_element_m3, detector_efficiency=detector_efficiency, solid_angle_fraction=solid_angle_fraction, ) thomson = thomson_scattering_voltage( ne, temp_map, r, z, thomson_sample_points, gain_v_per_m3=thomson_gain_v_per_m3, temp_sensitivity_per_kev=thomson_temp_sensitivity_per_kev, baseline_voltage_v=thomson_baseline_voltage_v, enforce_domain_bounds=bool(enforce_thomson_domain_bounds), ) return ForwardDiagnosticChannels( interferometer_phase_rad=phases, neutron_count_rate_hz=rate, thomson_scattering_voltage_v=thomson, )
# ── Extended diagnostics: ECE, SXR, Bolometry ────────────────────────
[docs] def ece_radiometer_temperature( electron_temp_keV: FloatArray, r_grid: FloatArray, z_grid: FloatArray, channel_r_positions: Sequence[float], *, z_observation: float = 0.0, optical_depth_factor: float = 1.0, ) -> FloatArray: """Predict ECE radiometer Te [keV] at specified R channel positions. Assumes optically thick plasma: signal = Te(R_ch, z_obs) * optical_depth_factor. """ if not channel_r_positions: raise ValueError("channel_r_positions must be non-empty.") odf = float(optical_depth_factor) if not np.isfinite(odf) or odf <= 0.0: raise ValueError("optical_depth_factor must be finite and > 0.") z_obs = float(z_observation) if not np.isfinite(z_obs): raise ValueError("z_observation must be finite.") te, r, z = _validate_field_grid( np.asarray(electron_temp_keV, dtype=np.float64), np.asarray(r_grid, dtype=np.float64), np.asarray(z_grid, dtype=np.float64), name="ece_radiometer_temperature", ) iz = _nearest_index(z, z_obs) out = np.zeros(len(channel_r_positions), dtype=np.float64) for i, r_ch in enumerate(channel_r_positions): r_val = float(r_ch) if not np.isfinite(r_val): raise ValueError(f"channel_r_positions[{i}] must be finite.") ir = _nearest_index(r, r_val) out[i] = max(float(te[iz, ir]), 0.0) * odf return out
[docs] def soft_xray_brightness( electron_density_m3: FloatArray, electron_temp_keV: FloatArray, r_grid: FloatArray, z_grid: FloatArray, chords: Sequence[tuple[tuple[float, float], tuple[float, float]]], *, z_eff: float = 1.5, filter_energy_kev: float = 1.0, samples: int = 96, enforce_domain_bounds: bool = False, ) -> FloatArray: """Predict soft X-ray line-integrated brightness [W/m^2]. Emissivity model: epsilon = ne^2 * sqrt(Te) * Z_eff * exp(-E_filter / Te). """ z_eff_val = float(z_eff) e_filter = float(filter_energy_kev) if not np.isfinite(z_eff_val) or z_eff_val < 1.0: raise ValueError("z_eff must be finite and >= 1.0.") if not np.isfinite(e_filter) or e_filter <= 0.0: raise ValueError("filter_energy_kev must be finite and > 0.") ne, r, z = _validate_field_grid( np.asarray(electron_density_m3, dtype=np.float64), np.asarray(r_grid, dtype=np.float64), np.asarray(z_grid, dtype=np.float64), name="soft_xray_brightness.ne", ) te, _, _ = _validate_field_grid( np.asarray(electron_temp_keV, dtype=np.float64), r, z, name="soft_xray_brightness.te", ) _validate_chords( chords, r_min=float(np.min(r)), r_max=float(np.max(r)), z_min=float(np.min(z)), z_max=float(np.max(z)), enforce_domain_bounds=bool(enforce_domain_bounds), ) # Compute 2D emissivity field te_safe = np.clip(te, 0.01, None) # avoid log(0) / div-by-zero emissivity = ne**2 * np.sqrt(te_safe) * z_eff_val * np.exp(-e_filter / te_safe) out = np.zeros(len(chords), dtype=np.float64) for i, (start, end) in enumerate(chords): out[i] = _line_integral_nearest(emissivity, r, z, start, end, samples=samples) return out
[docs] def bolometer_power_density( electron_density_m3: FloatArray, electron_temp_keV: FloatArray, r_grid: FloatArray, z_grid: FloatArray, chords: Sequence[tuple[tuple[float, float], tuple[float, float]]], *, z_eff: float = 1.5, impurity_fraction: float = 0.02, samples: int = 96, enforce_domain_bounds: bool = False, ) -> FloatArray: """Predict bolometer line-integrated radiated power [W/m^2/sr]. Radiation model: P_rad = ne^2 * L_z(Te) where L_z = C_rad * Z_eff^2 * sqrt(Te). C_rad ~ 1e-31 W m^3 (coronal equilibrium approximation). """ z_eff_val = float(z_eff) imp_frac = float(impurity_fraction) if not np.isfinite(z_eff_val) or z_eff_val < 1.0: raise ValueError("z_eff must be finite and >= 1.0.") if not np.isfinite(imp_frac) or imp_frac < 0.0: raise ValueError("impurity_fraction must be finite and >= 0.") ne, r, z = _validate_field_grid( np.asarray(electron_density_m3, dtype=np.float64), np.asarray(r_grid, dtype=np.float64), np.asarray(z_grid, dtype=np.float64), name="bolometer_power_density.ne", ) te, _, _ = _validate_field_grid( np.asarray(electron_temp_keV, dtype=np.float64), r, z, name="bolometer_power_density.te", ) _validate_chords( chords, r_min=float(np.min(r)), r_max=float(np.max(r)), z_min=float(np.min(z)), z_max=float(np.max(z)), enforce_domain_bounds=bool(enforce_domain_bounds), ) C_rad = 1.0e-31 # W m^3 (coronal equilibrium) te_safe = np.clip(te, 0.01, None) # P_rad = ne^2 * C_rad * Z_eff^2 * sqrt(Te) * (1 + impurity_fraction) p_rad = ne**2 * C_rad * z_eff_val**2 * np.sqrt(te_safe) * (1.0 + imp_frac) out = np.zeros(len(chords), dtype=np.float64) for i, (start, end) in enumerate(chords): out[i] = _line_integral_nearest(p_rad, r, z, start, end, samples=samples) return out
[docs] def cxrs_ion_diagnostics( ion_temp_keV: FloatArray, rotation_km_s: FloatArray, r_grid: FloatArray, z_grid: FloatArray, chords: Sequence[tuple[tuple[float, float], tuple[float, float]]], *, beam_r_center: float = 6.2, beam_width: float = 0.1, samples: int = 96, ) -> tuple[FloatArray, FloatArray]: """Predict CXRS ion temperature and toroidal rotation. Weights signals by a Gaussian beam-emission profile centered at beam_r_center. """ ti, r, z = _validate_field_grid( np.asarray(ion_temp_keV, dtype=np.float64), np.asarray(r_grid, dtype=np.float64), np.asarray(z_grid, dtype=np.float64), name="cxrs.ti", ) vphi, _, _ = _validate_field_grid( np.asarray(rotation_km_s, dtype=np.float64), r, z, name="cxrs.vphi", ) # Emission weight: exp(-(R - R_beam)^2 / w^2) rr_mesh, _ = np.meshgrid(r, z) weight_map = np.exp(-((rr_mesh - beam_r_center) ** 2) / (beam_width**2)) ti_out = np.zeros(len(chords), dtype=np.float64) vphi_out = np.zeros(len(chords), dtype=np.float64) for i, (start, end) in enumerate(chords): # Weighted integrals sum_w = _line_integral_nearest(weight_map, r, z, start, end, samples=samples) if sum_w > 1e-9: ti_out[i] = ( _line_integral_nearest(ti * weight_map, r, z, start, end, samples=samples) / sum_w ) vphi_out[i] = ( _line_integral_nearest(vphi * weight_map, r, z, start, end, samples=samples) / sum_w ) return ti_out, vphi_out