# SPDX-License-Identifier: AGPL-3.0-or-later
# Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — G-EQDSK Equilibrium File Parser
"""
Reader and writer for the G-EQDSK (EFIT) equilibrium file format.
The G-EQDSK format is the *de facto* standard for tokamak equilibrium
data exchange. It stores the poloidal flux map ψ(R,Z) on a uniform
(R,Z) grid together with 1-D profile arrays and scalar quantities.
References include Lao et al., Nucl. Fusion 25 (1985) 1611, FreeQDSK
documentation, and the ITER IMAS equilibrium IDS.
The parser expects the standard Fortran fixed-width layout: five values per
line using ``(5e16.9)``, a 48-character header description followed by idum,
``nw`` and ``nh``, the 20-scalar equilibrium block, five 1-D profile arrays,
the row-major ``psirz`` flux map, and boundary/limiter ``(R, Z)`` point lists.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
import math
from pathlib import Path
from typing import IO, Union, cast
import numpy as np
from numpy.typing import NDArray
MAX_GEQDSK_BYTES = 10 * 1024 * 1024
MAX_GEQDSK_GRID_POINTS = 1_000_000
MAX_GEQDSK_CONTOUR_POINTS = 100_000
MAX_GEQDSK_NUMERIC_TOKENS = 4_500_000
GEQDSK_SOURCE_CONVENTION_MODES = {
"raw_canonical": "raw_canonical",
"public_sparc_named_adapter": "public_sparc_named_adapter",
}
GEQDSK_PUBLIC_SPARC_SOURCE_ADAPTERS = {
"sparc_1305.eqdsk": "scaled_by_2pi",
"sparc_1310.eqdsk": "scaled_by_2pi",
"sparc_1315.eqdsk": "scaled_by_2pi",
"sparc_1349.eqdsk": "scaled_by_2pi",
}
GEQDSK_SOURCE_CONVENTION_ADAPTERS = {
"scaled_by_2pi": 2.0 * np.pi,
}
# ── Data container ────────────────────────────────────────────────────
[docs]
@dataclass
class GEqdsk:
"""Container for all data in a G-EQDSK file."""
# Header
description: str = ""
nw: int = 0 # number of horizontal (R) grid points
nh: int = 0 # number of vertical (Z) grid points
# Scalars
rdim: float = 0.0 # horizontal dimension (m)
zdim: float = 0.0 # vertical dimension (m)
rcentr: float = 0.0 # reference R for vacuum toroidal field (m)
rleft: float = 0.0 # R at left of computational domain (m)
zmid: float = 0.0 # Z at centre of computational domain (m)
rmaxis: float = 0.0 # R of magnetic axis (m)
zmaxis: float = 0.0 # Z of magnetic axis (m)
simag: float = 0.0 # ψ at magnetic axis (Wb/rad)
sibry: float = 0.0 # ψ at plasma boundary (Wb/rad)
bcentr: float = 0.0 # vacuum toroidal field at rcentr (T)
current: float = 0.0 # plasma current (A)
# 1-D profile arrays (length nw)
fpol: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
pres: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
ffprime: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
pprime: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
qpsi: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
# 2-D flux map (nh × nw)
psirz: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
# Boundary and limiter contours
rbdry: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
zbdry: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
rlim: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
zlim: NDArray[np.float64] = field(default_factory=lambda: np.array([], dtype=np.float64))
# Source convention metadata
source_convention: str = "raw_canonical"
source_convention_adapter: str = "not_applied"
source_convention_adapter_pass: bool = False
source_convention_metadata: dict[str, object] = field(default_factory=dict)
# ── Derived grids ─────────────────────────────────────────────────
@property
def r(self) -> NDArray[np.float64]:
"""1-D array of R grid values."""
return cast(NDArray[np.float64], np.linspace(self.rleft, self.rleft + self.rdim, self.nw))
@property
def z(self) -> NDArray[np.float64]:
"""1-D array of Z grid values."""
return cast(
NDArray[np.float64],
np.linspace(self.zmid - self.zdim / 2, self.zmid + self.zdim / 2, self.nh),
)
@property
def psi_norm(self) -> NDArray[np.float64]:
"""Normalised poloidal flux ψ_N ∈ [0, 1] (axis=0, boundary=1)."""
return cast(NDArray[np.float64], np.linspace(0.0, 1.0, self.nw))
[docs]
def psi_to_norm(self, psi: NDArray[np.float64]) -> NDArray[np.float64]:
"""Map raw ψ to normalised ψ_N."""
return (psi - self.simag) / (self.sibry - self.simag)
[docs]
def to_config(self, name: str = "eqdsk") -> dict[str, object]:
"""Convert to a FusionKernel JSON config with GEQDSK shape metadata.
GEQDSK files do not contain an external-coil set, so the returned
``coils`` list remains empty. They do contain plasma-boundary and
limiter contours; those are exported into ``free_boundary`` so native
free-boundary workflows can use the EFIT boundary as an isoflux shape
contract instead of silently discarding it.
"""
r = self.r
z = self.z
free_boundary: dict[str, object] = {
"magnetic_axis": [float(self.rmaxis), float(self.zmaxis)],
"psi_axis": float(self.simag),
"psi_boundary": float(self.sibry),
"boundary_source": "geqdsk_rbdry_zbdry",
}
if self.rbdry.size and self.zbdry.size:
if self.rbdry.shape != self.zbdry.shape:
raise ValueError("GEQDSK boundary R/Z arrays must have matching lengths.")
if not np.all(np.isfinite(self.rbdry)) or not np.all(np.isfinite(self.zbdry)):
raise ValueError("GEQDSK boundary contour must contain finite values only.")
boundary_points = np.column_stack([self.rbdry, self.zbdry]).astype(np.float64)
free_boundary["target_flux_points"] = boundary_points.tolist()
free_boundary["target_flux_values"] = np.full(
boundary_points.shape[0],
float(self.sibry),
dtype=np.float64,
).tolist()
if self.rlim.size or self.zlim.size:
if self.rlim.shape != self.zlim.shape:
raise ValueError("GEQDSK limiter R/Z arrays must have matching lengths.")
if not np.all(np.isfinite(self.rlim)) or not np.all(np.isfinite(self.zlim)):
raise ValueError("GEQDSK limiter contour must contain finite values only.")
free_boundary["limiter_points"] = (
np.column_stack([self.rlim, self.zlim]).astype(np.float64).tolist()
)
return {
"reactor_name": name,
"grid_resolution": [self.nw, self.nh],
"dimensions": {
"R_min": float(r[0]),
"R_max": float(r[-1]),
"Z_min": float(z[0]),
"Z_max": float(z[-1]),
},
"physics": {
"plasma_current_target": float(self.current / 1e6),
"vacuum_permeability": 1.0,
},
"coils": [], # not stored in GEQDSK
"free_boundary": free_boundary,
"solver": {
"max_iterations": 1000,
"convergence_threshold": 1e-4,
"relaxation_factor": 0.1,
},
}
# ── Reader ────────────────────────────────────────────────────────────
# Regex that matches Fortran-style floats: optional sign, digits,
# optional decimal, optional exponent. Handles the common case where
# two values like "2.385E+00-1.216E+01" run together (no whitespace).
_FORTRAN_FLOAT_RE = re.compile(r"[+-]?\d*\.?\d+(?:[eEdD][+-]?\d+)?")
def _split_fortran(line: str) -> list[str]:
"""Extract all Fortran-format floats from *line*."""
return _FORTRAN_FLOAT_RE.findall(line)
def _parse_header(line: str) -> tuple[str, int, int]:
"""Parse the first line: 48-char description + idum nw nh."""
parts = line.split()
if len(parts) < 3:
raise ValueError("GEQDSK header must contain idum, nw, and nh")
nh = int(parts[-1])
nw = int(parts[-2])
desc = " ".join(parts[:-3]) if len(parts) > 3 else ""
return desc, nw, nh
def _validate_dimensions(nw: int, nh: int) -> None:
if nw < 2 or nh < 2:
raise ValueError(f"GEQDSK grid dimensions must be >= 2x2, got {(nw, nh)}")
if nw * nh > MAX_GEQDSK_GRID_POINTS:
raise ValueError(
f"GEQDSK grid dimensions exceed safety limit {MAX_GEQDSK_GRID_POINTS}: got {nw}x{nh}"
)
def _validate_contour_count(name: str, value: int) -> None:
if value < 0:
raise ValueError(f"GEQDSK {name} count must be non-negative")
if value > MAX_GEQDSK_CONTOUR_POINTS:
raise ValueError(f"GEQDSK {name} count exceeds safety limit {MAX_GEQDSK_CONTOUR_POINTS}")
def _parse_finite_fortran_float(token: str, *, field_name: str) -> float:
"""Parse one GEQDSK numeric token and reject non-finite values."""
try:
value = float(token.replace("D", "E").replace("d", "e"))
except ValueError as exc:
raise ValueError(f"GEQDSK {field_name} is not a valid finite float.") from exc
if not math.isfinite(value):
raise ValueError(f"GEQDSK {field_name} must be finite.")
return value
def _require_finite_array(name: str, array: NDArray[np.float64]) -> None:
if not np.all(np.isfinite(array)):
raise ValueError(f"GEQDSK {name} must contain finite values only.")
def _validate_geqdsk_schema(eq: GEqdsk) -> None:
"""Validate parsed GEQDSK shape and finite-value invariants."""
_validate_dimensions(eq.nw, eq.nh)
if eq.rdim <= 0.0 or eq.zdim <= 0.0:
raise ValueError("GEQDSK rdim and zdim must be positive.")
if eq.rcentr <= 0.0:
raise ValueError("GEQDSK rcentr must be positive.")
if eq.sibry == eq.simag:
raise ValueError("GEQDSK psi boundary must differ from psi axis.")
for name in (
"rdim",
"zdim",
"rcentr",
"rleft",
"zmid",
"rmaxis",
"zmaxis",
"simag",
"sibry",
"bcentr",
"current",
):
value = getattr(eq, name)
if not math.isfinite(float(value)):
raise ValueError(f"GEQDSK scalar {name} must be finite.")
expected_profile_shape = (eq.nw,)
for name in ("fpol", "pres", "ffprime", "pprime", "qpsi"):
array = getattr(eq, name)
if array.shape != expected_profile_shape:
raise ValueError(
f"GEQDSK {name} shape must be {expected_profile_shape}, got {array.shape}."
)
_require_finite_array(name, array)
if eq.psirz.shape != (eq.nh, eq.nw):
raise ValueError(f"GEQDSK psirz shape must be {(eq.nh, eq.nw)}, got {eq.psirz.shape}.")
_require_finite_array("psirz", eq.psirz)
for r_name, z_name in (("rbdry", "zbdry"), ("rlim", "zlim")):
r_values = getattr(eq, r_name)
z_values = getattr(eq, z_name)
if r_values.shape != z_values.shape:
raise ValueError(f"GEQDSK {r_name}/{z_name} contours must have matching lengths.")
_require_finite_array(r_name, r_values)
_require_finite_array(z_name, z_values)
def _detect_named_source_adapter(path: Path) -> str | None:
"""Return the explicit source adapter for known public SPARC filenames."""
return GEQDSK_PUBLIC_SPARC_SOURCE_ADAPTERS.get(path.name.lower())
def _apply_source_adapter(
eq: GEqdsk,
*,
convention: str,
source_file: Path,
mode: str,
) -> GEqdsk:
"""Apply a documented source-convention adapter and record provenance."""
if convention == "canonical":
eq.source_convention = "canonical"
eq.source_convention_adapter = "not_needed"
eq.source_convention_adapter_pass = True
eq.source_convention_metadata = {
"requested_mode": mode,
"source_file": source_file.name,
"adapter": "not_applied",
"applied_scale": 1.0,
"provenance": "none",
}
return eq
multiplier = GEQDSK_SOURCE_CONVENTION_ADAPTERS.get(convention)
if multiplier is None:
eq.source_convention = "unsupported"
eq.source_convention_adapter = convention
eq.source_convention_adapter_pass = False
eq.source_convention_metadata = {
"requested_mode": mode,
"source_file": source_file.name,
"adapter": convention,
"applied_scale": float("nan"),
"provenance": "unsupported_named_adapter",
"error": "unsupported source convention adapter",
}
return eq
eq.ffprime = multiplier * eq.ffprime
eq.pprime = multiplier * eq.pprime
eq.ffprime = np.round(eq.ffprime, decimals=11)
eq.pprime = np.round(eq.pprime, decimals=11)
eq.source_convention = "canonical"
eq.source_convention_adapter = convention
eq.source_convention_adapter_pass = True
eq.source_convention_metadata = {
"requested_mode": mode,
"source_file": source_file.name,
"adapter": convention,
"applied_scale": float(multiplier),
"provenance": "public_sparc_named_adapter",
"public_case": source_file.name,
}
return eq
[docs]
def read_geqdsk(
path: Union[str, Path],
*,
source_convention_mode: str = "raw_canonical",
) -> GEqdsk:
"""
Read a G-EQDSK file and return a :class:`GEqdsk` container.
Handles both whitespace-separated and fixed-width Fortran formats,
including the common case where values run together without spaces
(e.g. ``2.385E+00-1.216E+01``).
Parameters
----------
path : str or Path
Path to the G-EQDSK file.
source_convention_mode : {"raw_canonical", "public_sparc_named_adapter"}
``raw_canonical`` keeps parsed ``ffprime`` and ``pprime`` in raw file
units (default, strict mode).
``public_sparc_named_adapter`` applies a documented, source-aware
adapter only for explicitly recognized public SPARC files.
Returns
-------
GEqdsk
Parsed equilibrium data.
"""
path = Path(path)
if source_convention_mode not in GEQDSK_SOURCE_CONVENTION_MODES:
raise ValueError(
"Unsupported source_convention_mode: "
f"{source_convention_mode}. Expected one of: "
+ ", ".join(sorted(GEQDSK_SOURCE_CONVENTION_MODES.keys()))
)
size = path.stat().st_size
if size > MAX_GEQDSK_BYTES:
raise ValueError(f"GEQDSK file too large: {size} bytes exceeds {MAX_GEQDSK_BYTES}")
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
if not lines:
raise ValueError("GEQDSK file is empty")
# Parse header
desc, nw, nh = _parse_header(lines[0])
_validate_dimensions(nw, nh)
# Gather all numeric tokens using Fortran-aware splitting
tokens: list[str] = []
for line in lines[1:]:
tokens.extend(_split_fortran(line))
if len(tokens) > MAX_GEQDSK_NUMERIC_TOKENS:
raise ValueError(
f"GEQDSK numeric token count exceeds safety limit {MAX_GEQDSK_NUMERIC_TOKENS}"
)
idx = 0
def _next() -> float:
nonlocal idx
if idx >= len(tokens):
raise ValueError("GEQDSK file ended before all required values were present")
val = _parse_finite_fortran_float(tokens[idx], field_name=f"token[{idx}]")
idx += 1
return val
def _read_array(n: int) -> NDArray[np.float64]:
nonlocal idx
if n < 0:
raise ValueError("GEQDSK array length must be non-negative")
if idx + n > len(tokens):
raise ValueError("GEQDSK file ended before all required array values were present")
arr = np.array(
[
_parse_finite_fortran_float(tokens[idx + i], field_name=f"token[{idx + i}]")
for i in range(n)
],
dtype=np.float64,
)
idx += n
return arr
# 20 scalar values
rdim = _next()
zdim = _next()
rcentr = _next()
rleft = _next()
zmid = _next()
rmaxis = _next()
zmaxis = _next()
simag = _next()
sibry = _next()
bcentr = _next()
current = _next()
_next() # simag (duplicate)
_next() # padding
_next() # rmaxis duplicate
_next() # padding
_next() # zmaxis duplicate
_next() # padding
_next() # sibry duplicate
_next() # padding
_next() # padding
# 1-D arrays (each nw values)
fpol = _read_array(nw)
pres = _read_array(nw)
ffprime = _read_array(nw)
pprime = _read_array(nw)
# 2-D flux map: nh rows × nw columns (row-major)
psirz = _read_array(nh * nw).reshape(nh, nw)
# q profile
qpsi = _read_array(nw)
# Boundary and limiter
nbdry = int(_next())
nlim = int(_next())
_validate_contour_count("boundary", nbdry)
_validate_contour_count("limiter", nlim)
required_contour_tokens = 2 * (nbdry + nlim)
if idx + required_contour_tokens > len(tokens):
raise ValueError("GEQDSK file ended before all required contour values were present")
rbdry = np.zeros(nbdry)
zbdry = np.zeros(nbdry)
for i in range(nbdry):
rbdry[i] = _next()
zbdry[i] = _next()
rlim = np.zeros(nlim)
zlim = np.zeros(nlim)
for i in range(nlim):
rlim[i] = _next()
zlim[i] = _next()
eq = GEqdsk(
description=desc,
nw=nw,
nh=nh,
rdim=rdim,
zdim=zdim,
rcentr=rcentr,
rleft=rleft,
zmid=zmid,
rmaxis=rmaxis,
zmaxis=zmaxis,
simag=simag,
sibry=sibry,
bcentr=bcentr,
current=current,
fpol=fpol,
pres=pres,
ffprime=ffprime,
pprime=pprime,
qpsi=qpsi,
psirz=psirz,
rbdry=rbdry,
zbdry=zbdry,
rlim=rlim,
zlim=zlim,
)
_validate_geqdsk_schema(eq)
if source_convention_mode == "public_sparc_named_adapter":
adapter = _detect_named_source_adapter(path)
if adapter is None:
eq.source_convention_adapter = "no_named_adapter"
eq.source_convention_adapter_pass = False
eq.source_convention = "raw_canonical"
eq.source_convention_metadata = {
"requested_mode": source_convention_mode,
"source_file": path.name,
"adapter": "no_named_adapter",
"applied_scale": 1.0,
"provenance": "public_sparc_named_adapter_no_match",
"error": "no recognized public SPARC convention mapping",
}
else:
eq = _apply_source_adapter(
eq,
convention=adapter,
source_file=path,
mode=source_convention_mode,
)
return eq
# ── Writer ────────────────────────────────────────────────────────────
[docs]
def write_geqdsk(eq: GEqdsk, path: Union[str, Path]) -> None:
"""
Write a :class:`GEqdsk` to a G-EQDSK file.
Parameters
----------
eq : GEqdsk
Equilibrium data.
path : str or Path
Output file path.
"""
path = Path(path)
def _fmt(val: float) -> str:
return f"{float(val):24.17e}"
def _write_array(f: "IO[str]", arr: NDArray[np.float64]) -> None:
for i, v in enumerate(arr.ravel()):
f.write(_fmt(v))
if (i + 1) % 5 == 0:
f.write("\n")
if len(arr.ravel()) % 5 != 0:
f.write("\n")
with open(path, "w") as f:
# Header
desc = eq.description[:48].ljust(48)
f.write(f"{desc} 0 {eq.nw:4d} {eq.nh:4d}\n")
# Scalars (20 values, 5 per line)
scalars = [
eq.rdim,
eq.zdim,
eq.rcentr,
eq.rleft,
eq.zmid,
eq.rmaxis,
eq.zmaxis,
eq.simag,
eq.sibry,
eq.bcentr,
eq.current,
eq.simag,
0.0,
eq.rmaxis,
0.0,
eq.zmaxis,
0.0,
eq.sibry,
0.0,
0.0,
]
for i, v in enumerate(scalars):
f.write(_fmt(v))
if (i + 1) % 5 == 0:
f.write("\n")
# 1-D arrays
_write_array(f, eq.fpol)
_write_array(f, eq.pres)
_write_array(f, eq.ffprime)
_write_array(f, eq.pprime)
# 2-D flux map
_write_array(f, eq.psirz)
# q profile
_write_array(f, eq.qpsi)
# Boundary / limiter counts
nbdry = len(eq.rbdry)
nlim = len(eq.rlim)
f.write(f"{nbdry:5d}{nlim:5d}\n")
# Boundary pairs
for i in range(nbdry):
f.write(_fmt(eq.rbdry[i]))
f.write(_fmt(eq.zbdry[i]))
if ((i + 1) * 2) % 5 == 0 or i == nbdry - 1:
f.write("\n")
if nbdry > 0 and (nbdry * 2) % 5 != 0:
f.write("\n")
# Limiter pairs
for i in range(nlim):
f.write(_fmt(eq.rlim[i]))
f.write(_fmt(eq.zlim[i]))
if ((i + 1) * 2) % 5 == 0 or i == nlim - 1:
f.write("\n")
if nlim > 0 and (nlim * 2) % 5 != 0:
f.write("\n")