Source code for scpn_fusion.io.imas_history_payloads

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — IMAS History Payloads
"""History/pulse payload helpers split from ``imas_connector`` monolith."""

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Mapping

REQUIRED_IDS_PULSE_KEYS = (
    "schema",
    "machine",
    "shot",
    "run",
    "time_slices",
)


def _connector():
    # Local import keeps this module acyclic with ``imas_connector``.
    from scpn_fusion.io import imas_connector

    return imas_connector


def _has_all_keys(mapping: Mapping[str, Any], keys: tuple[str, ...]) -> bool:
    return all(key in mapping for key in keys)


[docs] def validate_ids_payload_sequence(payloads: Sequence[Mapping[str, Any]]) -> None: c = _connector() if isinstance(payloads, (str, bytes, bytearray)) or not isinstance(payloads, Sequence): raise ValueError("payloads must be a sequence of IDS payload mappings.") if len(payloads) == 0: raise ValueError("payloads must contain at least one IDS payload.") baseline_machine: str | None = None baseline_shot: int | None = None baseline_run: int | None = None prev_index: int | None = None prev_time_s: float | None = None for idx, payload in enumerate(payloads): if isinstance(payload, bool) or not isinstance(payload, Mapping): raise ValueError(f"payloads[{idx}] must be a mapping.") c.validate_ids_payload(payload) machine = str(payload.get("machine")) shot = c._coerce_int(f"payloads[{idx}].shot", payload.get("shot", 0), minimum=0) run = c._coerce_int(f"payloads[{idx}].run", payload.get("run", 0), minimum=0) time_slice = payload.get("time_slice") if not isinstance(time_slice, Mapping): raise ValueError(f"payloads[{idx}].time_slice must be a mapping.") time_index = c._coerce_int( f"payloads[{idx}].time_slice.index", time_slice.get("index", 0), minimum=0, ) time_s = c._coerce_finite_real( f"payloads[{idx}].time_slice.time_s", time_slice.get("time_s", 0.0), minimum=0.0, ) if baseline_machine is None: baseline_machine = machine baseline_shot = shot baseline_run = run else: if machine != baseline_machine: raise ValueError("All IDS payloads in sequence must share the same machine.") if shot != baseline_shot: raise ValueError("All IDS payloads in sequence must share the same shot.") if run != baseline_run: raise ValueError("All IDS payloads in sequence must share the same run.") if prev_index is not None and time_index <= prev_index: raise ValueError("IDS payload sequence requires strictly increasing time_slice.index.") if prev_time_s is not None and time_s <= prev_time_s: raise ValueError("IDS payload sequence requires strictly increasing time_slice.time_s.") prev_index = time_index prev_time_s = time_s
[docs] def digital_twin_history_to_ids( history: Sequence[Mapping[str, Any]], *, machine: str = "ITER", shot: int = 0, run: int = 0, ) -> list[dict[str, Any]]: c = _connector() if isinstance(history, (str, bytes, bytearray)) or not isinstance(history, Sequence): raise ValueError("history must be a sequence of digital twin snapshots.") if len(history) == 0: raise ValueError("history must contain at least one snapshot.") out: list[dict[str, Any]] = [] prev_time_ms: int | None = None for idx, snapshot in enumerate(history): if isinstance(snapshot, bool) or not isinstance(snapshot, Mapping): raise ValueError(f"history[{idx}] must be a mapping.") if _has_all_keys(snapshot, c.REQUIRED_PROFILE_1D_KEYS): payload = c.digital_twin_state_to_ids( snapshot, machine=machine, shot=shot, run=run, ) else: payload = c.digital_twin_summary_to_ids( snapshot, machine=machine, shot=shot, run=run, ) time_slice = payload.get("time_slice") if not isinstance(time_slice, Mapping): raise ValueError(f"history[{idx}] produced invalid IDS time_slice mapping.") time_ms = round( c._coerce_finite_real( f"history[{idx}].time_slice.time_s", time_slice.get("time_s", 0.0), minimum=0.0, ) * 1.0e3 ) if prev_time_ms is not None and time_ms <= prev_time_ms: time_ms = prev_time_ms + 1 payload["time_slice"]["index"] = idx payload["time_slice"]["time_s"] = time_ms * 1.0e-3 prev_time_ms = time_ms out.append(payload) validate_ids_payload_sequence(out) return out
[docs] def ids_to_digital_twin_history( payloads: Sequence[Mapping[str, Any]], ) -> list[dict[str, Any]]: c = _connector() validate_ids_payload_sequence(payloads) out: list[dict[str, Any]] = [] for payload in payloads: equilibrium = payload.get("equilibrium", {}) if isinstance(equilibrium, Mapping) and "profiles_1d" in equilibrium: out.append(c.ids_to_digital_twin_state(payload)) else: out.append(c.ids_to_digital_twin_summary(payload)) return out
[docs] def validate_ids_pulse_payload(pulse: Mapping[str, Any]) -> None: c = _connector() if not isinstance(pulse, Mapping): raise ValueError("IDS pulse payload must be a mapping.") missing = c._missing_required_keys(pulse, REQUIRED_IDS_PULSE_KEYS) if missing: raise ValueError(f"IDS pulse payload missing keys: {', '.join(missing)}") schema = str(pulse.get("schema", "")).strip() if schema != "ids_equilibrium_pulse_v1": raise ValueError( f"Unsupported IDS pulse schema '{schema}'. Expected 'ids_equilibrium_pulse_v1'.", ) machine = str(pulse.get("machine", "")).strip() if not machine: raise ValueError("pulse.machine must be a non-empty string.") shot = c._coerce_int("pulse.shot", pulse.get("shot", 0), minimum=0) run = c._coerce_int("pulse.run", pulse.get("run", 0), minimum=0) slices = pulse.get("time_slices") if not isinstance(slices, Sequence): raise ValueError("pulse.time_slices must be a sequence.") if len(slices) == 0: raise ValueError("pulse.time_slices must contain at least one payload.") validate_ids_payload_sequence(slices) for idx, payload in enumerate(slices): if str(payload.get("machine", "")).strip() != machine: raise ValueError(f"pulse.time_slices[{idx}].machine does not match pulse.machine.") if ( c._coerce_int(f"pulse.time_slices[{idx}].shot", payload.get("shot", 0), minimum=0) != shot ): raise ValueError(f"pulse.time_slices[{idx}].shot does not match pulse.shot.") if c._coerce_int(f"pulse.time_slices[{idx}].run", payload.get("run", 0), minimum=0) != run: raise ValueError(f"pulse.time_slices[{idx}].run does not match pulse.run.")
[docs] def digital_twin_history_to_ids_pulse( history: Sequence[Mapping[str, Any]], *, machine: str = "ITER", shot: int = 0, run: int = 0, ) -> dict[str, Any]: payloads = digital_twin_history_to_ids( history, machine=machine, shot=shot, run=run, ) pulse = { "schema": "ids_equilibrium_pulse_v1", "machine": machine, "shot": int(shot), "run": int(run), "time_slices": payloads, } validate_ids_pulse_payload(pulse) return pulse
[docs] def ids_pulse_to_digital_twin_history(pulse: Mapping[str, Any]) -> list[dict[str, Any]]: validate_ids_pulse_payload(pulse) slices = pulse.get("time_slices") if not isinstance(slices, Sequence): raise ValueError("pulse.time_slices must be a sequence.") return ids_to_digital_twin_history(slices)