Source code for scpn_fusion.io.imas_connector_omas

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — IMAS Connector OMAS Bridge
"""OMAS compatibility layer for equilibrium IDS payloads."""

from __future__ import annotations

from typing import Any, Mapping

import numpy as np

try:
    import omas  # type: ignore[import-untyped]

    HAS_OMAS = True
except ImportError:
    omas = None
    HAS_OMAS = False


[docs] def ids_to_omas_equilibrium(ids_dict: Mapping[str, Any]) -> Any: """Convert an IMAS equilibrium IDS dict to an OMAS ODS.""" if not HAS_OMAS or omas is None: raise ImportError( "The 'omas' package is required for OMAS conversion. Install with: pip install omas" ) ods = omas.ODS() props = ids_dict.get("ids_properties", {}) ods["equilibrium.ids_properties.homogeneous_time"] = props.get("homogeneous_time", 1) ods["equilibrium.ids_properties.comment"] = props.get("comment", "") time_arr = ids_dict.get("time", [0.0]) ods["equilibrium.time"] = np.asarray(time_arr, dtype=np.float64) for i, ts in enumerate(ids_dict.get("time_slice", [])): prefix = f"equilibrium.time_slice.{i}" ods[f"{prefix}.time"] = ts.get("time", 0.0) gq = ts.get("global_quantities", {}) ods[f"{prefix}.global_quantities.ip"] = gq.get("ip", 0.0) axis = gq.get("magnetic_axis", {}) ods[f"{prefix}.global_quantities.magnetic_axis.r"] = axis.get("r", 0.0) ods[f"{prefix}.global_quantities.magnetic_axis.z"] = axis.get("z", 0.0) ods[f"{prefix}.global_quantities.psi_axis"] = gq.get("psi_axis", 0.0) ods[f"{prefix}.global_quantities.psi_boundary"] = gq.get("psi_boundary", 0.0) vtf = gq.get("vacuum_toroidal_field", {}) ods[f"{prefix}.global_quantities.vacuum_toroidal_field.r0"] = vtf.get("r0", 0.0) ods[f"{prefix}.global_quantities.vacuum_toroidal_field.b0"] = vtf.get("b0", 0.0) p1d = ts.get("profiles_1d", {}) if p1d: ods[f"{prefix}.profiles_1d.psi"] = np.asarray(p1d.get("psi", []), dtype=np.float64) ods[f"{prefix}.profiles_1d.q"] = np.asarray(p1d.get("q", []), dtype=np.float64) ods[f"{prefix}.profiles_1d.pressure"] = np.asarray( p1d.get("pressure", []), dtype=np.float64 ) ods[f"{prefix}.profiles_1d.f"] = np.asarray(p1d.get("f", []), dtype=np.float64) bdry = ts.get("boundary", {}) outline = bdry.get("outline", {}) if outline: ods[f"{prefix}.boundary.outline.r"] = np.asarray(outline.get("r", []), dtype=np.float64) ods[f"{prefix}.boundary.outline.z"] = np.asarray(outline.get("z", []), dtype=np.float64) return ods
[docs] def omas_equilibrium_to_ids(ods: Any) -> dict[str, Any]: """Convert OMAS ODS equilibrium data back to an IDS dict.""" if not HAS_OMAS: raise ImportError("The 'omas' package is required for OMAS conversion.") ids_dict: dict[str, Any] = {} ids_dict["ids_properties"] = { "homogeneous_time": int(ods.get("equilibrium.ids_properties.homogeneous_time", 1)), "comment": str(ods.get("equilibrium.ids_properties.comment", "")), } time_val = ods.get("equilibrium.time", np.array([0.0])) if hasattr(time_val, "tolist"): ids_dict["time"] = time_val.tolist() else: ids_dict["time"] = [float(time_val)] time_slices = [] i = 0 while True: prefix = f"equilibrium.time_slice.{i}" try: t = float(ods[f"{prefix}.time"]) except (KeyError, IndexError, TypeError): break ts: dict[str, Any] = {"time": t} gq: dict[str, Any] = { "ip": float(ods.get(f"{prefix}.global_quantities.ip", 0.0)), "magnetic_axis": { "r": float(ods.get(f"{prefix}.global_quantities.magnetic_axis.r", 0.0)), "z": float(ods.get(f"{prefix}.global_quantities.magnetic_axis.z", 0.0)), }, "psi_axis": float(ods.get(f"{prefix}.global_quantities.psi_axis", 0.0)), "psi_boundary": float(ods.get(f"{prefix}.global_quantities.psi_boundary", 0.0)), "vacuum_toroidal_field": { "r0": float(ods.get(f"{prefix}.global_quantities.vacuum_toroidal_field.r0", 0.0)), "b0": float(ods.get(f"{prefix}.global_quantities.vacuum_toroidal_field.b0", 0.0)), }, } ts["global_quantities"] = gq p1d: dict[str, Any] = {} for key in ("psi", "q", "pressure", "f"): val = ods.get(f"{prefix}.profiles_1d.{key}", None) if val is not None: p1d[key] = np.asarray(val).tolist() if hasattr(val, "tolist") else val if p1d: ts["profiles_1d"] = p1d bdry_r = ods.get(f"{prefix}.boundary.outline.r", None) bdry_z = ods.get(f"{prefix}.boundary.outline.z", None) if bdry_r is not None and bdry_z is not None: ts["boundary"] = { "outline": { "r": np.asarray(bdry_r).tolist() if hasattr(bdry_r, "tolist") else bdry_r, "z": np.asarray(bdry_z).tolist() if hasattr(bdry_z, "tolist") else bdry_z, } } time_slices.append(ts) i += 1 ids_dict["time_slice"] = time_slices return ids_dict
__all__ = [ "HAS_OMAS", "ids_to_omas_equilibrium", "omas_equilibrium_to_ids", ]