Skip to content

SCPN NeuroCore Bridge API

scpn_neurocore is the canonical Python namespace for SC-NeuroCore bridge surfaces consumed by SCPN repositories. It is intentionally separate from the core sc_neurocore simulation package: sc_neurocore hosts the neuromorphic engine, while scpn_neurocore hosts source-facing artifacts and datastream packets for cross-repository SCPN workflows.

The previous unseparated namespace spelling is no longer an active tracked package. Current consumers must import the bridge through:

Python
from scpn_neurocore.bridge import (
    load_connectome,
    load_live_stream,
    load_power_grid,
    load_tokamak_data,
)

Bridge-only imports are kept lightweight. Importing scpn_neurocore.bridge does not eagerly import datastream codecs or optional engine modules. Datastream exports from scpn_neurocore are resolved lazily when requested.

Artifact Contract

The bridge loaders return QPUBridgeArtifact objects. Each artifact carries:

  • K_nm: finite, symmetric, non-negative coupling matrix with zero diagonal.
  • omega: finite natural-frequency vector matching K_nm.shape[0].
  • theta0: optional finite initial phase vector.
  • layer_assignments: per-oscillator integer layer labels.
  • source_mode, source_name, normalization, and extraction_method.
  • source_timestamp or replay_id.
  • stable SHA-256 hashes for numerical arrays and the full artifact payload.

Publication-safe modes are recorded, replay, curated, and derived. Smoke-test modes are synthetic, simulation, and fixture; they are useful for interface health checks but are not publication evidence.

Datastream Contract

scpn_neurocore.datastream builds auditable bridge packets containing waveform samples, AER spike rasters, telemetry summaries, optional QPU artifact hashes, and optional optimiser observations. The schema version is:

Text Only
scpn_neurocore.datastream.v1

This bridge packet is distinct from the internal 16-layer sc_neurocore.scpn.datastream JSON contract, whose schema remains sc-neurocore.scpn.datastream.v1.

API Reference

scpn_neurocore.bridge

Auditable QPU data artifacts for quantum-control campaign bridges.

SourceDataUnavailable

Bases: FileNotFoundError

Raised when a requested publication-grade source is not bundled.

Source code in src/scpn_neurocore/bridge.py
Python
29
30
class SourceDataUnavailable(FileNotFoundError):
    """Raised when a requested publication-grade source is not bundled."""

QPUBridgeArtifact dataclass

Provenance-rich oscillator artifact for QPU campaign consumers.

Source code in src/scpn_neurocore/bridge.py
Python
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@dataclass(frozen=True)
class QPUBridgeArtifact:
    """Provenance-rich oscillator artifact for QPU campaign consumers."""

    domain: str
    source_name: str
    source_mode: str
    K_nm: np.ndarray
    omega: np.ndarray
    theta0: np.ndarray | None
    layer_assignments: list[int]
    normalization: str
    extraction_method: str
    source_timestamp: str | None = None
    replay_id: str | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        _validate_source_mode(self.source_mode)
        for key, value in (
            ("domain", self.domain),
            ("source_name", self.source_name),
            ("normalization", self.normalization),
            ("extraction_method", self.extraction_method),
        ):
            _require_non_empty_string(value, key)
        _validate_artifact_arrays(self.K_nm, self.omega, self.theta0, self.layer_assignments)
        if self.source_timestamp is None and self.replay_id is None:
            raise ValueError("source_timestamp or replay_id is required")
        if self.source_timestamp is not None:
            _require_non_empty_string(self.source_timestamp, "source_timestamp")
        if self.replay_id is not None:
            _require_non_empty_string(self.replay_id, "replay_id")
        if not isinstance(self.metadata, dict):
            raise ValueError("metadata must be a mapping")

    @property
    def hashes(self) -> dict[str, str]:
        """Stable SHA256 hashes for numeric payloads."""
        result = {
            "K_nm_sha256": _hash_array(self.K_nm),
            "omega_sha256": _hash_array(self.omega),
        }
        if self.theta0 is not None:
            result["theta0_sha256"] = _hash_array(self.theta0)
        return result

    @property
    def artifact_sha256(self) -> str:
        """Stable SHA256 of the full JSON-compatible artifact payload."""
        payload = self.to_qpu_artifact_dict(include_artifact_hash=False)
        return _canonical_artifact_sha256(payload)

    def to_qpu_artifact_dict(self, *, include_artifact_hash: bool = True) -> dict[str, Any]:
        """Return the Quantum Control artifact mapping."""
        payload: dict[str, Any] = {
            "schema_version": QPU_ARTIFACT_SCHEMA_VERSION,
            "domain": self.domain,
            "source_name": self.source_name,
            "source_mode": self.source_mode,
            "K_nm": self.K_nm.tolist(),
            "omega": self.omega.tolist(),
            "theta0": None if self.theta0 is None else self.theta0.tolist(),
            "layer_assignments": list(self.layer_assignments),
            "normalization": self.normalization,
            "extraction_method": self.extraction_method,
            "source_timestamp": self.source_timestamp,
            "replay_id": self.replay_id,
            "metadata": dict(self.metadata),
            "hashes": self.hashes,
        }
        if include_artifact_hash:
            payload["artifact_sha256"] = self.artifact_sha256
        return payload

hashes property

Stable SHA256 hashes for numeric payloads.

artifact_sha256 property

Stable SHA256 of the full JSON-compatible artifact payload.

to_qpu_artifact_dict(*, include_artifact_hash=True)

Return the Quantum Control artifact mapping.

Source code in src/scpn_neurocore/bridge.py
Python
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def to_qpu_artifact_dict(self, *, include_artifact_hash: bool = True) -> dict[str, Any]:
    """Return the Quantum Control artifact mapping."""
    payload: dict[str, Any] = {
        "schema_version": QPU_ARTIFACT_SCHEMA_VERSION,
        "domain": self.domain,
        "source_name": self.source_name,
        "source_mode": self.source_mode,
        "K_nm": self.K_nm.tolist(),
        "omega": self.omega.tolist(),
        "theta0": None if self.theta0 is None else self.theta0.tolist(),
        "layer_assignments": list(self.layer_assignments),
        "normalization": self.normalization,
        "extraction_method": self.extraction_method,
        "source_timestamp": self.source_timestamp,
        "replay_id": self.replay_id,
        "metadata": dict(self.metadata),
        "hashes": self.hashes,
    }
    if include_artifact_hash:
        payload["artifact_sha256"] = self.artifact_sha256
    return payload

load_connectome(name, n=None, *, source_mode=None, synthetic=False)

Load a connectome artifact for Quantum Control.

Publication-grade connectome files are not bundled in this repository. Callers must provide a real source through a future curated/replay path, or explicitly request a labelled synthetic smoke artifact.

Source code in src/scpn_neurocore/bridge.py
Python
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def load_connectome(
    name: str,
    n: int | None = None,
    *,
    source_mode: str | None = None,
    synthetic: bool = False,
) -> QPUBridgeArtifact:
    """Load a connectome artifact for Quantum Control.

    Publication-grade connectome files are not bundled in this repository.
    Callers must provide a real source through a future curated/replay path,
    or explicitly request a labelled synthetic smoke artifact.
    """
    if name not in {"c_elegans_sub", "c_elegans"}:
        raise ValueError(f"unsupported connectome source {name!r}")
    mode = _resolve_source_mode(source_mode, synthetic)
    size = 14 if n is None else _require_positive_n(n)
    if mode not in NON_PUBLICATION_SOURCE_MODES:
        _raise_unavailable(name, (size, size))

    knm = _chain_coupling(size, nearest=0.62, next_nearest=0.22)
    omega = np.linspace(0.8, 1.8, size, dtype=np.float64)
    theta0 = np.linspace(0.0, np.pi, size, endpoint=False, dtype=np.float64)
    return _artifact(
        domain="connectome",
        source_name=name,
        source_mode=mode,
        knm=knm,
        omega=omega,
        theta0=theta0,
        normalization="max_abs_to_unit_interval",
        extraction_method="deterministic_chain_smoke_fixture",
        metadata={
            "expected_shape": [size, size],
            "publication_safe": False,
            "reason": "synthetic smoke artifact; no bundled connectome source used",
        },
    )

load_tokamak_data(n=16, *, source_mode=None, synthetic=False)

Load tokamak/plasma oscillator data for Quantum Control.

Source code in src/scpn_neurocore/bridge.py
Python
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def load_tokamak_data(
    n: int = 16,
    *,
    source_mode: str | None = None,
    synthetic: bool = False,
) -> QPUBridgeArtifact:
    """Load tokamak/plasma oscillator data for Quantum Control."""
    mode = _resolve_source_mode(source_mode, synthetic)
    size = _require_positive_n(n)
    if mode not in NON_PUBLICATION_SOURCE_MODES:
        _raise_unavailable("tokamak", (size, size))

    knm: np.ndarray = _banded_coupling(size, base=0.45, decay=0.32)
    omega: np.ndarray = np.resize(
        np.array([10.0, 8.0, 3.0, 5.0, 0.5, 0.3, 0.1, 1.0], dtype=np.float64),
        size,
    )
    return _artifact(
        domain="tokamak",
        source_name="tokamak",
        source_mode=mode,
        knm=knm,
        omega=omega,
        theta0=np.zeros(size, dtype=np.float64),
        normalization="bounded_exponential_coupling",
        extraction_method="deterministic_plasma_timescale_smoke_fixture",
        metadata={
            "expected_shape": [size, size],
            "omega_units": "rad_s",
            "publication_safe": False,
        },
    )

load_power_grid(n, name=None, *, source_mode=None, synthetic=False)

Load power-grid oscillator data for Quantum Control.

Source code in src/scpn_neurocore/bridge.py
Python
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def load_power_grid(
    n: int,
    name: str | None = None,
    *,
    source_mode: str | None = None,
    synthetic: bool = False,
) -> QPUBridgeArtifact:
    """Load power-grid oscillator data for Quantum Control."""
    mode = _resolve_source_mode(source_mode, synthetic)
    size = _require_positive_n(n)
    source_name = "power_grid" if name is None else name
    if mode not in NON_PUBLICATION_SOURCE_MODES:
        _raise_unavailable(source_name, (size, size))

    knm: np.ndarray = _ring_coupling(size, nearest=0.5, long_range=0.08)
    omega: np.ndarray = np.ones(size, dtype=np.float64)
    if size >= 4:
        omega[1::4] = 1.02
        omega[3::4] = 0.98
    return _artifact(
        domain="power_grid",
        source_name=source_name,
        source_mode=mode,
        knm=knm,
        omega=omega,
        theta0=np.zeros(size, dtype=np.float64),
        normalization="per_unit_admittance_like_smoke_scaling",
        extraction_method="deterministic_ring_grid_smoke_fixture",
        metadata={
            "expected_shape": [size, size],
            "omega_units": "per_unit_frequency",
            "publication_safe": False,
        },
    )

load_live_stream(source, step, *, source_mode=None, synthetic=False)

Load one replayable live-stream artifact.

Source code in src/scpn_neurocore/bridge.py
Python
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def load_live_stream(
    source: str,
    step: int,
    *,
    source_mode: str | None = None,
    synthetic: bool = False,
) -> QPUBridgeArtifact:
    """Load one replayable live-stream artifact."""
    if step < 0:
        raise ValueError(f"step must be >= 0, got {step}")
    if source != "eeg_powergrid":
        raise ValueError(f"unsupported live stream source {source!r}")
    mode = _resolve_source_mode(source_mode, synthetic)
    size = 12
    if mode not in NON_PUBLICATION_SOURCE_MODES:
        _raise_unavailable(source, (size, size))

    knm = _ring_coupling(size, nearest=0.38, long_range=0.04)
    phase = float(step) * 0.1
    omega = 1.0 + 0.05 * np.sin(phase + np.arange(size, dtype=np.float64) * 0.5)
    theta0 = (phase + np.arange(size, dtype=np.float64) * np.pi / size) % (2.0 * np.pi)
    return _artifact(
        domain="live_stream",
        source_name=source,
        source_mode=mode,
        knm=knm,
        omega=omega,
        theta0=theta0,
        normalization="bounded_live_replay_smoke_scaling",
        extraction_method="deterministic_eeg_powergrid_step_fixture",
        replay_id=f"{mode}:{source}:step:{step}",
        metadata={
            "step": step,
            "expected_shape": [size, size],
            "publication_safe": False,
        },
    )

validate_qpu_artifact_payload(payload)

Validate a JSON-deserialised QPU bridge artifact payload.

Source code in src/scpn_neurocore/bridge.py
Python
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def validate_qpu_artifact_payload(payload: dict[str, Any]) -> None:
    """Validate a JSON-deserialised QPU bridge artifact payload."""
    if not isinstance(payload, dict):
        raise ValueError("QPU artifact payload must be a mapping")
    if payload.get("schema_version") != QPU_ARTIFACT_SCHEMA_VERSION:
        raise ValueError("unsupported QPU artifact schema_version")
    for key in ("domain", "source_name", "normalization", "extraction_method"):
        _require_non_empty_string(payload.get(key), key)
    source_mode = payload.get("source_mode")
    _validate_source_mode(source_mode)
    source_timestamp = payload.get("source_timestamp")
    replay_id = payload.get("replay_id")
    if source_timestamp is None and replay_id is None:
        raise ValueError("source_timestamp or replay_id is required")
    if source_timestamp is not None:
        _require_non_empty_string(source_timestamp, "source_timestamp")
    if replay_id is not None:
        _require_non_empty_string(replay_id, "replay_id")
    if not isinstance(payload.get("metadata"), dict):
        raise ValueError("metadata must be a mapping")

    knm = _array_from_payload(payload, "K_nm")
    omega = _array_from_payload(payload, "omega")
    theta0 = None if payload.get("theta0") is None else _array_from_payload(payload, "theta0")
    layer_assignments = payload.get("layer_assignments")
    if not isinstance(layer_assignments, list):
        raise ValueError("layer_assignments must be a list")
    _validate_artifact_arrays(knm, omega, theta0, layer_assignments)

    hashes = payload.get("hashes")
    if not isinstance(hashes, dict):
        raise ValueError("hashes must be present")
    _validate_payload_array_hash(hashes, "K_nm_sha256", knm)
    _validate_payload_array_hash(hashes, "omega_sha256", omega)
    if theta0 is None:
        if "theta0_sha256" in hashes:
            raise ValueError("theta0_sha256 must be absent when theta0 is null")
    else:
        _validate_payload_array_hash(hashes, "theta0_sha256", theta0)

    artifact_hash = payload.get("artifact_sha256")
    if not _is_sha256_hex(artifact_hash):
        raise ValueError("artifact_sha256 must be a SHA256 hex digest")
    expected_hash = _payload_sha256_without_artifact_hash(payload)
    if artifact_hash != expected_hash:
        raise ValueError("artifact_sha256 does not match payload")

scpn_neurocore.datastream

Build auditable SC-NeuroCore datastream packets for SCPN consumers.

DatastreamValidationError

Bases: ValueError

Raised when a datastream packet cannot be trusted by SCPN consumers.

Source code in src/scpn_neurocore/datastream.py
Python
31
32
class DatastreamValidationError(ValueError):
    """Raised when a datastream packet cannot be trusted by SCPN consumers."""

SCNeuroCoreDatastreamPacket dataclass

Hash-addressed bridge packet containing waveform, AER, and telemetry evidence.

Source code in src/scpn_neurocore/datastream.py
Python
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@dataclass(frozen=True)
class SCNeuroCoreDatastreamPacket:
    """Hash-addressed bridge packet containing waveform, AER, and telemetry evidence."""

    source_name: str
    source_mode: str
    waveform_shape: tuple[int, int]
    spike_shape: tuple[int, int]
    waveform_codec: str
    waveform_mode: str
    aer_codec: str
    waveform_bytes_sha256: str
    aer_bytes_sha256: str
    waveform_metrics: dict[str, int | float | bool]
    aer_metrics: dict[str, int | float | bool | str]
    telemetry: dict[str, Any]
    qpu_artifact_sha256: str | None = None
    optimiser_observation: dict[str, int | float | str | bool] | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    @property
    def packet_sha256(self) -> str:
        """Stable hash of the JSON-compatible datastream payload."""
        return _canonical_payload_sha256(self.to_bridge_dict(include_packet_hash=False))

    def to_bridge_dict(self, *, include_packet_hash: bool = True) -> dict[str, Any]:
        """Return the SCPN bridge packet mapping."""
        payload: dict[str, Any] = {
            "schema_version": SC_NEUROCORE_DATASTREAM_SCHEMA_VERSION,
            "source_name": self.source_name,
            "source_mode": self.source_mode,
            "waveform_shape": list(self.waveform_shape),
            "spike_shape": list(self.spike_shape),
            "waveform_codec": self.waveform_codec,
            "waveform_mode": self.waveform_mode,
            "aer_codec": self.aer_codec,
            "hashes": {
                "waveform_bytes_sha256": self.waveform_bytes_sha256,
                "aer_bytes_sha256": self.aer_bytes_sha256,
            },
            "waveform_metrics": dict(self.waveform_metrics),
            "aer_metrics": dict(self.aer_metrics),
            "telemetry": self.telemetry,
            "qpu_artifact_sha256": self.qpu_artifact_sha256,
            "optimiser_observation": (
                None if self.optimiser_observation is None else dict(self.optimiser_observation)
            ),
            "metadata": dict(self.metadata),
        }
        if include_packet_hash:
            payload["packet_sha256"] = self.packet_sha256
        return payload

packet_sha256 property

Stable hash of the JSON-compatible datastream payload.

to_bridge_dict(*, include_packet_hash=True)

Return the SCPN bridge packet mapping.

Source code in src/scpn_neurocore/datastream.py
Python
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def to_bridge_dict(self, *, include_packet_hash: bool = True) -> dict[str, Any]:
    """Return the SCPN bridge packet mapping."""
    payload: dict[str, Any] = {
        "schema_version": SC_NEUROCORE_DATASTREAM_SCHEMA_VERSION,
        "source_name": self.source_name,
        "source_mode": self.source_mode,
        "waveform_shape": list(self.waveform_shape),
        "spike_shape": list(self.spike_shape),
        "waveform_codec": self.waveform_codec,
        "waveform_mode": self.waveform_mode,
        "aer_codec": self.aer_codec,
        "hashes": {
            "waveform_bytes_sha256": self.waveform_bytes_sha256,
            "aer_bytes_sha256": self.aer_bytes_sha256,
        },
        "waveform_metrics": dict(self.waveform_metrics),
        "aer_metrics": dict(self.aer_metrics),
        "telemetry": self.telemetry,
        "qpu_artifact_sha256": self.qpu_artifact_sha256,
        "optimiser_observation": (
            None if self.optimiser_observation is None else dict(self.optimiser_observation)
        ),
        "metadata": dict(self.metadata),
    }
    if include_packet_hash:
        payload["packet_sha256"] = self.packet_sha256
    return payload

build_datastream_packet(*, waveform, spike_raster, source_name, source_mode, layer_id='input', waveform_codec=None, aer_codec=None, qpu_artifact=None, optimiser_observation=None, metadata=None)

Build one audited datastream packet from raw waveform and AER spikes.

Source code in src/scpn_neurocore/datastream.py
Python
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def build_datastream_packet(
    *,
    waveform: np.ndarray,
    spike_raster: np.ndarray,
    source_name: str,
    source_mode: str,
    layer_id: str = "input",
    waveform_codec: WaveformCodec | None = None,
    aer_codec: AERSpikeCodec | None = None,
    qpu_artifact: QPUBridgeArtifact | None = None,
    optimiser_observation: BenchmarkObservation | None = None,
    metadata: dict[str, Any] | None = None,
) -> SCNeuroCoreDatastreamPacket:
    """Build one audited datastream packet from raw waveform and AER spikes."""
    if source_mode not in SOURCE_MODES:
        raise DatastreamValidationError(f"unsupported source_mode {source_mode!r}")
    waveform_array = _validate_waveform(waveform)
    spikes = _validate_spike_raster(spike_raster)
    if waveform_array.shape != spikes.shape:
        raise DatastreamValidationError(
            f"waveform shape {waveform_array.shape} must match spike_raster shape {spikes.shape}"
        )

    wave_codec = waveform_codec or WaveformCodec(mode="spike")
    event_codec = aer_codec or AERSpikeCodec()
    waveform_bytes, waveform_result = wave_codec.compress(waveform_array)
    aer_bytes, aer_result = event_codec.compress(spikes)
    telemetry = _telemetry_summary(spikes, layer_id=layer_id)
    waveform_shape = (int(waveform_array.shape[0]), int(waveform_array.shape[1]))
    spike_shape = (int(spikes.shape[0]), int(spikes.shape[1]))

    return SCNeuroCoreDatastreamPacket(
        source_name=source_name,
        source_mode=source_mode,
        waveform_shape=waveform_shape,
        spike_shape=spike_shape,
        waveform_codec=type(wave_codec).__name__,
        waveform_mode=wave_codec.mode,
        aer_codec=type(event_codec).__name__,
        waveform_bytes_sha256=_hash_bytes(waveform_bytes),
        aer_bytes_sha256=_hash_bytes(aer_bytes),
        waveform_metrics=_waveform_metrics(waveform_result),
        aer_metrics=_aer_metrics(aer_result),
        telemetry=telemetry,
        qpu_artifact_sha256=None if qpu_artifact is None else qpu_artifact.artifact_sha256,
        optimiser_observation=(
            None if optimiser_observation is None else _observation_record(optimiser_observation)
        ),
        metadata={} if metadata is None else dict(metadata),
    )

validate_datastream_payload(payload)

Validate the public shape of a bridge datastream payload.

Source code in src/scpn_neurocore/datastream.py
Python
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def validate_datastream_payload(payload: dict[str, Any]) -> None:
    """Validate the public shape of a bridge datastream payload."""
    if payload.get("schema_version") != SC_NEUROCORE_DATASTREAM_SCHEMA_VERSION:
        raise DatastreamValidationError("unsupported datastream schema_version")
    for key in ("source_name", "waveform_codec", "waveform_mode", "aer_codec"):
        if not isinstance(payload.get(key), str) or not payload[key].strip():
            raise DatastreamValidationError(f"{key} must be a non-empty string")
    source_mode = payload.get("source_mode")
    if source_mode not in SOURCE_MODES:
        raise DatastreamValidationError("unsupported source_mode")
    hashes = payload.get("hashes")
    if not isinstance(hashes, dict):
        raise DatastreamValidationError("hashes must be present")
    for key in ("waveform_bytes_sha256", "aer_bytes_sha256"):
        if not _is_sha256_hex(hashes.get(key)):
            raise DatastreamValidationError(f"{key} must be a SHA256 hex digest")

    waveform_shape = _shape_from_payload(payload, "waveform_shape")
    spike_shape = _shape_from_payload(payload, "spike_shape")
    if waveform_shape != spike_shape:
        raise DatastreamValidationError("waveform_shape and spike_shape must match")

    waveform_metrics = _mapping_from_payload(payload, "waveform_metrics")
    aer_metrics = _mapping_from_payload(payload, "aer_metrics")
    telemetry = _mapping_from_payload(payload, "telemetry")
    total_ticks = _positive_int_from_mapping(telemetry, "total_ticks", "telemetry")
    total_spikes = _nonnegative_int_from_mapping(telemetry, "total_spikes", "telemetry")
    _validate_telemetry_layers(
        telemetry,
        expected_total_ticks=total_ticks,
        expected_total_spikes=total_spikes,
    )
    if total_ticks != spike_shape[0]:
        raise DatastreamValidationError("total_ticks must match spike_shape timesteps")
    aer_spikes = _nonnegative_int_from_mapping(aer_metrics, "n_spikes", "aer_metrics")
    if total_spikes != aer_spikes:
        raise DatastreamValidationError("total_spikes must match aer_metrics n_spikes")
    aer_timesteps = _positive_int_from_mapping(aer_metrics, "n_timesteps", "aer_metrics")
    if aer_timesteps != spike_shape[0]:
        raise DatastreamValidationError("n_timesteps must match spike_shape timesteps")
    aer_neurons = _positive_int_from_mapping(aer_metrics, "n_neurons", "aer_metrics")
    if aer_neurons != spike_shape[1]:
        raise DatastreamValidationError("n_neurons must match spike_shape neurons")
    waveform_samples = _positive_int_from_mapping(waveform_metrics, "n_samples", "waveform_metrics")
    if waveform_samples != waveform_shape[0]:
        raise DatastreamValidationError("n_samples must match waveform_shape samples")
    waveform_channels = _positive_int_from_mapping(
        waveform_metrics, "n_channels", "waveform_metrics"
    )
    if waveform_channels != waveform_shape[1]:
        raise DatastreamValidationError("n_channels must match waveform_shape channels")

    qpu_artifact_sha256 = payload.get("qpu_artifact_sha256")
    if qpu_artifact_sha256 is not None and not _is_sha256_hex(qpu_artifact_sha256):
        raise DatastreamValidationError("qpu_artifact_sha256 must be a SHA256 hex digest")
    _validate_optimiser_observation(payload.get("optimiser_observation"))
    if not isinstance(payload.get("metadata"), dict):
        raise DatastreamValidationError("metadata must be a mapping")
    packet_hash = payload.get("packet_sha256")
    if not _is_sha256_hex(packet_hash):
        raise DatastreamValidationError("packet_sha256 must be a SHA256 hex digest")
    expected_hash = _payload_sha256_without_packet_hash(payload)
    if packet_hash != expected_hash:
        raise DatastreamValidationError("packet_sha256 does not match payload")