Source code for scpn_fusion.io.imas_connector_equilibrium

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — IMAS Connector Equilibrium
"""IMAS DD v3 equilibrium converters."""

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Mapping

import numpy as np

from scpn_fusion.core.eqdsk import GEqdsk
from scpn_fusion.io.imas_connector_common import _missing_required_keys


IMAS_DD_EQUILIBRIUM_KEYS = (
    "ids_properties",
    "time",
    "time_slice",
)


[docs] def geqdsk_to_imas_equilibrium( eq: GEqdsk, *, time_s: float = 0.0, shot: int = 0, run: int = 0, ) -> dict[str, Any]: """Convert a GEqdsk equilibrium to an IMAS Data Dictionary ``equilibrium`` IDS.""" if eq.nw < 2 or eq.nh < 2: raise ValueError("GEqdsk must have nw >= 2 and nh >= 2 for IMAS conversion.") if eq.psirz.size == 0: raise ValueError("GEqdsk psirz must be non-empty for IMAS conversion.") time_val = float(time_s) r_grid = eq.r.tolist() z_grid = eq.z.tolist() psi_norm = eq.psi_norm.tolist() midplane_idx = eq.nh // 2 psi_1d = eq.psirz[midplane_idx, :].tolist() return { "ids_properties": { "homogeneous_time": 1, "comment": f"SCPN Fusion Core IMAS export (shot={shot}, run={run})", }, "time": [time_val], "time_slice": [ { "time": time_val, "global_quantities": { "ip": float(eq.current), "magnetic_axis": { "r": float(eq.rmaxis), "z": float(eq.zmaxis), }, "psi_axis": float(eq.simag), "psi_boundary": float(eq.sibry), "vacuum_toroidal_field": { "r0": float(eq.rcentr), "b0": float(eq.bcentr), }, }, "profiles_1d": { "psi": psi_1d, "q": eq.qpsi.tolist() if eq.qpsi.size > 0 else [], "pressure": eq.pres.tolist() if eq.pres.size > 0 else [], "f": eq.fpol.tolist() if eq.fpol.size > 0 else [], "psi_norm": psi_norm, }, "profiles_2d": [ { "psi": eq.psirz.tolist(), "grid": { "dim1": r_grid, "dim2": z_grid, }, "grid_type": {"index": 1, "name": "rectangular"}, } ], "boundary": { "outline": { "r": eq.rbdry.tolist() if eq.rbdry.size > 0 else [], "z": eq.zbdry.tolist() if eq.zbdry.size > 0 else [], } }, } ], "code": { "name": "SCPN-Fusion-Core", "version": "2.0.0", }, }
[docs] def imas_equilibrium_to_geqdsk(ids: Mapping[str, Any]) -> GEqdsk: """Convert an IMAS Data Dictionary ``equilibrium`` IDS back to a GEqdsk.""" missing = _missing_required_keys(ids, IMAS_DD_EQUILIBRIUM_KEYS) if missing: raise ValueError(f"IMAS equilibrium IDS missing keys: {missing}") time_slices = ids["time_slice"] if not isinstance(time_slices, Sequence) or len(time_slices) == 0: raise ValueError("IMAS equilibrium must have at least one time_slice.") ts = time_slices[0] gq = ts.get("global_quantities", {}) p1d = ts.get("profiles_1d", {}) p2d_list = ts.get("profiles_2d", []) boundary = ts.get("boundary", {}) axis = gq.get("magnetic_axis", {}) vtf = gq.get("vacuum_toroidal_field", {}) if not p2d_list: raise ValueError("IMAS equilibrium must have at least one profiles_2d entry.") p2d = p2d_list[0] grid = p2d.get("grid", {}) r_grid = np.asarray(grid.get("dim1", []), dtype=np.float64) z_grid = np.asarray(grid.get("dim2", []), dtype=np.float64) psirz = np.asarray(p2d.get("psi", []), dtype=np.float64) nw = r_grid.size nh = z_grid.size if nw < 2 or nh < 2: raise ValueError("IMAS profiles_2d grid must have at least 2 points per dimension.") if psirz.ndim != 2 or psirz.shape != (nh, nw): raise ValueError( f"IMAS profiles_2d psi shape {psirz.shape} does not match grid ({nh}, {nw})." ) rdim = float(r_grid[-1] - r_grid[0]) zdim = float(z_grid[-1] - z_grid[0]) rleft = float(r_grid[0]) zmid = float(0.5 * (z_grid[0] + z_grid[-1])) outline = boundary.get("outline", {}) return GEqdsk( description="IMAS import", nw=nw, nh=nh, rdim=rdim, zdim=zdim, rcentr=float(vtf.get("r0", 0.0)), rleft=rleft, zmid=zmid, rmaxis=float(axis.get("r", 0.0)), zmaxis=float(axis.get("z", 0.0)), simag=float(gq.get("psi_axis", 0.0)), sibry=float(gq.get("psi_boundary", 0.0)), bcentr=float(vtf.get("b0", 0.0)), current=float(gq.get("ip", 0.0)), fpol=np.asarray(p1d.get("f", []), dtype=np.float64), pres=np.asarray(p1d.get("pressure", []), dtype=np.float64), ffprime=np.zeros(nw, dtype=np.float64), pprime=np.zeros(nw, dtype=np.float64), qpsi=np.asarray(p1d.get("q", []), dtype=np.float64), psirz=psirz, rbdry=np.asarray(outline.get("r", []), dtype=np.float64), zbdry=np.asarray(outline.get("z", []), dtype=np.float64), rlim=np.array([], dtype=np.float64), zlim=np.array([], dtype=np.float64), )
__all__ = [ "IMAS_DD_EQUILIBRIUM_KEYS", "geqdsk_to_imas_equilibrium", "imas_equilibrium_to_geqdsk", ]