Source code for scpn_fusion.core.eqdsk

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — G-EQDSK Equilibrium File Parser
"""
Reader and writer for the G-EQDSK (EFIT) equilibrium file format.

The G-EQDSK format is the *de facto* standard for tokamak equilibrium
data exchange.  It stores the poloidal flux map ψ(R,Z) on a uniform
(R,Z) grid together with 1-D profile arrays and scalar quantities.

References
----------
- Lao et al., Nucl. Fusion 25 (1985) 1611
- FreeQDSK documentation: https://freeqdsk.readthedocs.io/
- ITER IMAS Data Dictionary (equilibrium IDS)

Format specification
--------------------
- Fortran fixed-width: 5 values per line, ``(5e16.9)``
- Header line: 48-char description, 3 ints (idum, nw, nh)
- Scalars block (20 values): rdim, zdim, rcentr, rleft, zmid,
  rmaxis, zmaxis, simag, sibry, bcentr, current, …
- 1-D arrays (each nw values): fpol, pres, ffprime, pprime, qpsi
- 2-D array: psirz (nh × nw values, row-major)
- Boundary & limiter point counts + (R,Z) pairs
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import IO, Union

import numpy as np
from numpy.typing import NDArray


# ── Data container ────────────────────────────────────────────────────


[docs] @dataclass class GEqdsk: """Container for all data in a G-EQDSK file.""" # Header description: str = "" nw: int = 0 # number of horizontal (R) grid points nh: int = 0 # number of vertical (Z) grid points # Scalars rdim: float = 0.0 # horizontal dimension (m) zdim: float = 0.0 # vertical dimension (m) rcentr: float = 0.0 # reference R for vacuum toroidal field (m) rleft: float = 0.0 # R at left of computational domain (m) zmid: float = 0.0 # Z at centre of computational domain (m) rmaxis: float = 0.0 # R of magnetic axis (m) zmaxis: float = 0.0 # Z of magnetic axis (m) simag: float = 0.0 # ψ at magnetic axis (Wb/rad) sibry: float = 0.0 # ψ at plasma boundary (Wb/rad) bcentr: float = 0.0 # vacuum toroidal field at rcentr (T) current: float = 0.0 # plasma current (A) # 1-D profile arrays (length nw) fpol: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) pres: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) ffprime: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) pprime: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) qpsi: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) # 2-D flux map (nh × nw) psirz: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) # Boundary and limiter contours rbdry: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) zbdry: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) rlim: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) zlim: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64)) # ── Derived grids ───────────────────────────────────────────────── @property def r(self) -> NDArray[np.float64]: """1-D array of R grid values.""" return np.linspace(self.rleft, self.rleft + self.rdim, self.nw) @property def z(self) -> NDArray[np.float64]: """1-D array of Z grid values.""" return np.linspace(self.zmid - self.zdim / 2, self.zmid + self.zdim / 2, self.nh) @property def psi_norm(self) -> NDArray[np.float64]: """Normalised poloidal flux ψ_N ∈ [0, 1] (axis=0, boundary=1).""" return np.linspace(0.0, 1.0, self.nw)
[docs] def psi_to_norm(self, psi: NDArray[np.float64]) -> NDArray[np.float64]: """Map raw ψ to normalised ψ_N.""" return (psi - self.simag) / (self.sibry - self.simag)
[docs] def to_config(self, name: str = "eqdsk") -> dict[str, object]: """Convert to FusionKernel JSON config dict (approximate).""" r = self.r z = self.z return { "reactor_name": name, "grid_resolution": [self.nw, self.nh], "dimensions": { "R_min": float(r[0]), "R_max": float(r[-1]), "Z_min": float(z[0]), "Z_max": float(z[-1]), }, "physics": { "plasma_current_target": float(self.current / 1e6), "vacuum_permeability": 1.0, }, "coils": [], # not stored in GEQDSK "solver": { "max_iterations": 1000, "convergence_threshold": 1e-4, "relaxation_factor": 0.1, }, }
# ── Reader ──────────────────────────────────────────────────────────── # Regex that matches Fortran-style floats: optional sign, digits, # optional decimal, optional exponent. Handles the common case where # two values like "2.385E+00-1.216E+01" run together (no whitespace). _FORTRAN_FLOAT_RE = re.compile(r"[+-]?\d*\.?\d+(?:[eEdD][+-]?\d+)?") def _split_fortran(line: str) -> list[str]: """Extract all Fortran-format floats from *line*.""" return _FORTRAN_FLOAT_RE.findall(line) def _parse_header(line: str) -> tuple[str, int, int]: """Parse the first line: 48-char description + idum nw nh.""" parts = line.split() nh = int(parts[-1]) nw = int(parts[-2]) desc = " ".join(parts[:-3]) if len(parts) > 3 else "" return desc, nw, nh
[docs] def read_geqdsk(path: Union[str, Path]) -> GEqdsk: """ Read a G-EQDSK file and return a :class:`GEqdsk` container. Handles both whitespace-separated and fixed-width Fortran formats, including the common case where values run together without spaces (e.g. ``2.385E+00-1.216E+01``). Parameters ---------- path : str or Path Path to the G-EQDSK file. Returns ------- GEqdsk Parsed equilibrium data. """ path = Path(path) with open(path, "r") as f: lines = f.readlines() # Parse header desc, nw, nh = _parse_header(lines[0]) # Gather all numeric tokens using Fortran-aware splitting tokens: list[str] = [] for line in lines[1:]: tokens.extend(_split_fortran(line)) idx = 0 def _next() -> float: nonlocal idx val = float(tokens[idx].replace("D", "E").replace("d", "e")) idx += 1 return val def _read_array(n: int) -> NDArray[np.float64]: nonlocal idx arr = np.array( [float(tokens[idx + i].replace("D", "E").replace("d", "e")) for i in range(n)] ) idx += n return arr # 20 scalar values rdim = _next() zdim = _next() rcentr = _next() rleft = _next() zmid = _next() rmaxis = _next() zmaxis = _next() simag = _next() sibry = _next() bcentr = _next() current = _next() _next() # simag (duplicate) _next() # padding _next() # rmaxis duplicate _next() # padding _next() # zmaxis duplicate _next() # padding _next() # sibry duplicate _next() # padding _next() # padding # 1-D arrays (each nw values) fpol = _read_array(nw) pres = _read_array(nw) ffprime = _read_array(nw) pprime = _read_array(nw) # 2-D flux map: nh rows × nw columns (row-major) psirz = _read_array(nh * nw).reshape(nh, nw) # q profile qpsi = _read_array(nw) # Boundary and limiter nbdry = int(_next()) nlim = int(_next()) rbdry = np.zeros(nbdry) zbdry = np.zeros(nbdry) for i in range(nbdry): rbdry[i] = _next() zbdry[i] = _next() rlim = np.zeros(nlim) zlim = np.zeros(nlim) for i in range(nlim): rlim[i] = _next() zlim[i] = _next() return GEqdsk( description=desc, nw=nw, nh=nh, rdim=rdim, zdim=zdim, rcentr=rcentr, rleft=rleft, zmid=zmid, rmaxis=rmaxis, zmaxis=zmaxis, simag=simag, sibry=sibry, bcentr=bcentr, current=current, fpol=fpol, pres=pres, ffprime=ffprime, pprime=pprime, qpsi=qpsi, psirz=psirz, rbdry=rbdry, zbdry=zbdry, rlim=rlim, zlim=zlim, )
# ── Writer ────────────────────────────────────────────────────────────
[docs] def write_geqdsk(eq: GEqdsk, path: Union[str, Path]) -> None: """ Write a :class:`GEqdsk` to a G-EQDSK file. Parameters ---------- eq : GEqdsk Equilibrium data. path : str or Path Output file path. """ path = Path(path) def _fmt(val: float) -> str: return f"{val:16.9e}" def _write_array(f: "IO[str]", arr: NDArray[np.float64]) -> None: for i, v in enumerate(arr.ravel()): f.write(_fmt(v)) if (i + 1) % 5 == 0: f.write("\n") if len(arr.ravel()) % 5 != 0: f.write("\n") with open(path, "w") as f: # Header desc = eq.description[:48].ljust(48) f.write(f"{desc} 0 {eq.nw:4d} {eq.nh:4d}\n") # Scalars (20 values, 5 per line) scalars = [ eq.rdim, eq.zdim, eq.rcentr, eq.rleft, eq.zmid, eq.rmaxis, eq.zmaxis, eq.simag, eq.sibry, eq.bcentr, eq.current, eq.simag, 0.0, eq.rmaxis, 0.0, eq.zmaxis, 0.0, eq.sibry, 0.0, 0.0, ] for i, v in enumerate(scalars): f.write(_fmt(v)) if (i + 1) % 5 == 0: f.write("\n") # 1-D arrays _write_array(f, eq.fpol) _write_array(f, eq.pres) _write_array(f, eq.ffprime) _write_array(f, eq.pprime) # 2-D flux map _write_array(f, eq.psirz) # q profile _write_array(f, eq.qpsi) # Boundary / limiter counts nbdry = len(eq.rbdry) nlim = len(eq.rlim) f.write(f"{nbdry:5d}{nlim:5d}\n") # Boundary pairs for i in range(nbdry): f.write(_fmt(eq.rbdry[i])) f.write(_fmt(eq.zbdry[i])) if ((i + 1) * 2) % 5 == 0 or i == nbdry - 1: f.write("\n") if nbdry > 0 and (nbdry * 2) % 5 != 0: f.write("\n") # Limiter pairs for i in range(nlim): f.write(_fmt(eq.rlim[i])) f.write(_fmt(eq.zlim[i])) if ((i + 1) * 2) % 5 == 0 or i == nlim - 1: f.write("\n") if nlim > 0 and (nlim * 2) % 5 != 0: f.write("\n")