SCPN NeuroCore Bridge API
scpn_neurocore is the canonical Python namespace for SC-NeuroCore bridge
surfaces consumed by SCPN repositories. It is intentionally separate from the
core sc_neurocore simulation package: sc_neurocore hosts the neuromorphic
engine, while scpn_neurocore hosts source-facing artifacts and datastream
packets for cross-repository SCPN workflows.
The previous unseparated namespace spelling is no longer an active tracked
package. Current consumers must import the bridge through:
Pythonfrom scpn_neurocore.bridge import (
load_connectome,
load_live_stream,
load_power_grid,
load_tokamak_data,
)
Bridge-only imports are kept lightweight. Importing scpn_neurocore.bridge
does not eagerly import datastream codecs or optional engine modules. Datastream
exports from scpn_neurocore are resolved lazily when requested.
Artifact Contract
The bridge loaders return QPUBridgeArtifact objects. Each artifact carries:
K_nm: finite, symmetric, non-negative coupling matrix with zero diagonal.
omega: finite natural-frequency vector matching K_nm.shape[0].
theta0: optional finite initial phase vector.
layer_assignments: per-oscillator integer layer labels.
source_mode, source_name, normalization, and extraction_method.
source_timestamp or replay_id.
- stable SHA-256 hashes for numerical arrays and the full artifact payload.
Publication-safe modes are recorded, replay, curated, and derived.
Smoke-test modes are synthetic, simulation, and fixture; they are useful
for interface health checks but are not publication evidence.
Datastream Contract
scpn_neurocore.datastream builds auditable bridge packets containing waveform
samples, AER spike rasters, telemetry summaries, optional QPU artifact hashes,
and optional optimiser observations. The schema version is:
Text Onlyscpn_neurocore.datastream.v1
This bridge packet is distinct from the internal 16-layer
sc_neurocore.scpn.datastream JSON contract, whose schema remains
sc-neurocore.scpn.datastream.v1.
API Reference
scpn_neurocore.bridge
Auditable QPU data artifacts for quantum-control campaign bridges.
SourceDataUnavailable
Bases: FileNotFoundError
Raised when a requested publication-grade source is not bundled.
Source code in src/scpn_neurocore/bridge.py
| Python |
|---|
| class SourceDataUnavailable(FileNotFoundError):
"""Raised when a requested publication-grade source is not bundled."""
|
QPUBridgeArtifact
dataclass
Provenance-rich oscillator artifact for QPU campaign consumers.
Source code in src/scpn_neurocore/bridge.py
| Python |
|---|
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 | @dataclass(frozen=True)
class QPUBridgeArtifact:
"""Provenance-rich oscillator artifact for QPU campaign consumers."""
domain: str
source_name: str
source_mode: str
K_nm: np.ndarray
omega: np.ndarray
theta0: np.ndarray | None
layer_assignments: list[int]
normalization: str
extraction_method: str
source_timestamp: str | None = None
replay_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
_validate_source_mode(self.source_mode)
for key, value in (
("domain", self.domain),
("source_name", self.source_name),
("normalization", self.normalization),
("extraction_method", self.extraction_method),
):
_require_non_empty_string(value, key)
_validate_artifact_arrays(self.K_nm, self.omega, self.theta0, self.layer_assignments)
if self.source_timestamp is None and self.replay_id is None:
raise ValueError("source_timestamp or replay_id is required")
if self.source_timestamp is not None:
_require_non_empty_string(self.source_timestamp, "source_timestamp")
if self.replay_id is not None:
_require_non_empty_string(self.replay_id, "replay_id")
if not isinstance(self.metadata, dict):
raise ValueError("metadata must be a mapping")
@property
def hashes(self) -> dict[str, str]:
"""Stable SHA256 hashes for numeric payloads."""
result = {
"K_nm_sha256": _hash_array(self.K_nm),
"omega_sha256": _hash_array(self.omega),
}
if self.theta0 is not None:
result["theta0_sha256"] = _hash_array(self.theta0)
return result
@property
def artifact_sha256(self) -> str:
"""Stable SHA256 of the full JSON-compatible artifact payload."""
payload = self.to_qpu_artifact_dict(include_artifact_hash=False)
return _canonical_artifact_sha256(payload)
def to_qpu_artifact_dict(self, *, include_artifact_hash: bool = True) -> dict[str, Any]:
"""Return the Quantum Control artifact mapping."""
payload: dict[str, Any] = {
"schema_version": QPU_ARTIFACT_SCHEMA_VERSION,
"domain": self.domain,
"source_name": self.source_name,
"source_mode": self.source_mode,
"K_nm": self.K_nm.tolist(),
"omega": self.omega.tolist(),
"theta0": None if self.theta0 is None else self.theta0.tolist(),
"layer_assignments": list(self.layer_assignments),
"normalization": self.normalization,
"extraction_method": self.extraction_method,
"source_timestamp": self.source_timestamp,
"replay_id": self.replay_id,
"metadata": dict(self.metadata),
"hashes": self.hashes,
}
if include_artifact_hash:
payload["artifact_sha256"] = self.artifact_sha256
return payload
|
hashes
property
Stable SHA256 hashes for numeric payloads.
artifact_sha256
property
Stable SHA256 of the full JSON-compatible artifact payload.
to_qpu_artifact_dict(*, include_artifact_hash=True)
Return the Quantum Control artifact mapping.
Source code in src/scpn_neurocore/bridge.py
| Python |
|---|
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 | def to_qpu_artifact_dict(self, *, include_artifact_hash: bool = True) -> dict[str, Any]:
"""Return the Quantum Control artifact mapping."""
payload: dict[str, Any] = {
"schema_version": QPU_ARTIFACT_SCHEMA_VERSION,
"domain": self.domain,
"source_name": self.source_name,
"source_mode": self.source_mode,
"K_nm": self.K_nm.tolist(),
"omega": self.omega.tolist(),
"theta0": None if self.theta0 is None else self.theta0.tolist(),
"layer_assignments": list(self.layer_assignments),
"normalization": self.normalization,
"extraction_method": self.extraction_method,
"source_timestamp": self.source_timestamp,
"replay_id": self.replay_id,
"metadata": dict(self.metadata),
"hashes": self.hashes,
}
if include_artifact_hash:
payload["artifact_sha256"] = self.artifact_sha256
return payload
|
load_connectome(name, n=None, *, source_mode=None, synthetic=False)
Load a connectome artifact for Quantum Control.
Publication-grade connectome files are not bundled in this repository.
Callers must provide a real source through a future curated/replay path,
or explicitly request a labelled synthetic smoke artifact.
Source code in src/scpn_neurocore/bridge.py
| Python |
|---|
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 | def load_connectome(
name: str,
n: int | None = None,
*,
source_mode: str | None = None,
synthetic: bool = False,
) -> QPUBridgeArtifact:
"""Load a connectome artifact for Quantum Control.
Publication-grade connectome files are not bundled in this repository.
Callers must provide a real source through a future curated/replay path,
or explicitly request a labelled synthetic smoke artifact.
"""
if name not in {"c_elegans_sub", "c_elegans"}:
raise ValueError(f"unsupported connectome source {name!r}")
mode = _resolve_source_mode(source_mode, synthetic)
size = 14 if n is None else _require_positive_n(n)
if mode not in NON_PUBLICATION_SOURCE_MODES:
_raise_unavailable(name, (size, size))
knm = _chain_coupling(size, nearest=0.62, next_nearest=0.22)
omega = np.linspace(0.8, 1.8, size, dtype=np.float64)
theta0 = np.linspace(0.0, np.pi, size, endpoint=False, dtype=np.float64)
return _artifact(
domain="connectome",
source_name=name,
source_mode=mode,
knm=knm,
omega=omega,
theta0=theta0,
normalization="max_abs_to_unit_interval",
extraction_method="deterministic_chain_smoke_fixture",
metadata={
"expected_shape": [size, size],
"publication_safe": False,
"reason": "synthetic smoke artifact; no bundled connectome source used",
},
)
|
load_tokamak_data(n=16, *, source_mode=None, synthetic=False)
Load tokamak/plasma oscillator data for Quantum Control.
Source code in src/scpn_neurocore/bridge.py
| Python |
|---|
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 | def load_tokamak_data(
n: int = 16,
*,
source_mode: str | None = None,
synthetic: bool = False,
) -> QPUBridgeArtifact:
"""Load tokamak/plasma oscillator data for Quantum Control."""
mode = _resolve_source_mode(source_mode, synthetic)
size = _require_positive_n(n)
if mode not in NON_PUBLICATION_SOURCE_MODES:
_raise_unavailable("tokamak", (size, size))
knm: np.ndarray = _banded_coupling(size, base=0.45, decay=0.32)
omega: np.ndarray = np.resize(
np.array([10.0, 8.0, 3.0, 5.0, 0.5, 0.3, 0.1, 1.0], dtype=np.float64),
size,
)
return _artifact(
domain="tokamak",
source_name="tokamak",
source_mode=mode,
knm=knm,
omega=omega,
theta0=np.zeros(size, dtype=np.float64),
normalization="bounded_exponential_coupling",
extraction_method="deterministic_plasma_timescale_smoke_fixture",
metadata={
"expected_shape": [size, size],
"omega_units": "rad_s",
"publication_safe": False,
},
)
|
load_power_grid(n, name=None, *, source_mode=None, synthetic=False)
Load power-grid oscillator data for Quantum Control.
Source code in src/scpn_neurocore/bridge.py
| Python |
|---|
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 | def load_power_grid(
n: int,
name: str | None = None,
*,
source_mode: str | None = None,
synthetic: bool = False,
) -> QPUBridgeArtifact:
"""Load power-grid oscillator data for Quantum Control."""
mode = _resolve_source_mode(source_mode, synthetic)
size = _require_positive_n(n)
source_name = "power_grid" if name is None else name
if mode not in NON_PUBLICATION_SOURCE_MODES:
_raise_unavailable(source_name, (size, size))
knm: np.ndarray = _ring_coupling(size, nearest=0.5, long_range=0.08)
omega: np.ndarray = np.ones(size, dtype=np.float64)
if size >= 4:
omega[1::4] = 1.02
omega[3::4] = 0.98
return _artifact(
domain="power_grid",
source_name=source_name,
source_mode=mode,
knm=knm,
omega=omega,
theta0=np.zeros(size, dtype=np.float64),
normalization="per_unit_admittance_like_smoke_scaling",
extraction_method="deterministic_ring_grid_smoke_fixture",
metadata={
"expected_shape": [size, size],
"omega_units": "per_unit_frequency",
"publication_safe": False,
},
)
|
load_live_stream(source, step, *, source_mode=None, synthetic=False)
Load one replayable live-stream artifact.
Source code in src/scpn_neurocore/bridge.py
| Python |
|---|
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255 | def load_live_stream(
source: str,
step: int,
*,
source_mode: str | None = None,
synthetic: bool = False,
) -> QPUBridgeArtifact:
"""Load one replayable live-stream artifact."""
if step < 0:
raise ValueError(f"step must be >= 0, got {step}")
if source != "eeg_powergrid":
raise ValueError(f"unsupported live stream source {source!r}")
mode = _resolve_source_mode(source_mode, synthetic)
size = 12
if mode not in NON_PUBLICATION_SOURCE_MODES:
_raise_unavailable(source, (size, size))
knm = _ring_coupling(size, nearest=0.38, long_range=0.04)
phase = float(step) * 0.1
omega = 1.0 + 0.05 * np.sin(phase + np.arange(size, dtype=np.float64) * 0.5)
theta0 = (phase + np.arange(size, dtype=np.float64) * np.pi / size) % (2.0 * np.pi)
return _artifact(
domain="live_stream",
source_name=source,
source_mode=mode,
knm=knm,
omega=omega,
theta0=theta0,
normalization="bounded_live_replay_smoke_scaling",
extraction_method="deterministic_eeg_powergrid_step_fixture",
replay_id=f"{mode}:{source}:step:{step}",
metadata={
"step": step,
"expected_shape": [size, size],
"publication_safe": False,
},
)
|
validate_qpu_artifact_payload(payload)
Validate a JSON-deserialised QPU bridge artifact payload.
Source code in src/scpn_neurocore/bridge.py
| Python |
|---|
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 | def validate_qpu_artifact_payload(payload: dict[str, Any]) -> None:
"""Validate a JSON-deserialised QPU bridge artifact payload."""
if not isinstance(payload, dict):
raise ValueError("QPU artifact payload must be a mapping")
if payload.get("schema_version") != QPU_ARTIFACT_SCHEMA_VERSION:
raise ValueError("unsupported QPU artifact schema_version")
for key in ("domain", "source_name", "normalization", "extraction_method"):
_require_non_empty_string(payload.get(key), key)
source_mode = payload.get("source_mode")
_validate_source_mode(source_mode)
source_timestamp = payload.get("source_timestamp")
replay_id = payload.get("replay_id")
if source_timestamp is None and replay_id is None:
raise ValueError("source_timestamp or replay_id is required")
if source_timestamp is not None:
_require_non_empty_string(source_timestamp, "source_timestamp")
if replay_id is not None:
_require_non_empty_string(replay_id, "replay_id")
if not isinstance(payload.get("metadata"), dict):
raise ValueError("metadata must be a mapping")
knm = _array_from_payload(payload, "K_nm")
omega = _array_from_payload(payload, "omega")
theta0 = None if payload.get("theta0") is None else _array_from_payload(payload, "theta0")
layer_assignments = payload.get("layer_assignments")
if not isinstance(layer_assignments, list):
raise ValueError("layer_assignments must be a list")
_validate_artifact_arrays(knm, omega, theta0, layer_assignments)
hashes = payload.get("hashes")
if not isinstance(hashes, dict):
raise ValueError("hashes must be present")
_validate_payload_array_hash(hashes, "K_nm_sha256", knm)
_validate_payload_array_hash(hashes, "omega_sha256", omega)
if theta0 is None:
if "theta0_sha256" in hashes:
raise ValueError("theta0_sha256 must be absent when theta0 is null")
else:
_validate_payload_array_hash(hashes, "theta0_sha256", theta0)
artifact_hash = payload.get("artifact_sha256")
if not _is_sha256_hex(artifact_hash):
raise ValueError("artifact_sha256 must be a SHA256 hex digest")
expected_hash = _payload_sha256_without_artifact_hash(payload)
if artifact_hash != expected_hash:
raise ValueError("artifact_sha256 does not match payload")
|
scpn_neurocore.datastream
Build auditable SC-NeuroCore datastream packets for SCPN consumers.
DatastreamValidationError
Bases: ValueError
Raised when a datastream packet cannot be trusted by SCPN consumers.
Source code in src/scpn_neurocore/datastream.py
| Python |
|---|
| class DatastreamValidationError(ValueError):
"""Raised when a datastream packet cannot be trusted by SCPN consumers."""
|
SCNeuroCoreDatastreamPacket
dataclass
Hash-addressed bridge packet containing waveform, AER, and telemetry evidence.
Source code in src/scpn_neurocore/datastream.py
| Python |
|---|
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | @dataclass(frozen=True)
class SCNeuroCoreDatastreamPacket:
"""Hash-addressed bridge packet containing waveform, AER, and telemetry evidence."""
source_name: str
source_mode: str
waveform_shape: tuple[int, int]
spike_shape: tuple[int, int]
waveform_codec: str
waveform_mode: str
aer_codec: str
waveform_bytes_sha256: str
aer_bytes_sha256: str
waveform_metrics: dict[str, int | float | bool]
aer_metrics: dict[str, int | float | bool | str]
telemetry: dict[str, Any]
qpu_artifact_sha256: str | None = None
optimiser_observation: dict[str, int | float | str | bool] | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@property
def packet_sha256(self) -> str:
"""Stable hash of the JSON-compatible datastream payload."""
return _canonical_payload_sha256(self.to_bridge_dict(include_packet_hash=False))
def to_bridge_dict(self, *, include_packet_hash: bool = True) -> dict[str, Any]:
"""Return the SCPN bridge packet mapping."""
payload: dict[str, Any] = {
"schema_version": SC_NEUROCORE_DATASTREAM_SCHEMA_VERSION,
"source_name": self.source_name,
"source_mode": self.source_mode,
"waveform_shape": list(self.waveform_shape),
"spike_shape": list(self.spike_shape),
"waveform_codec": self.waveform_codec,
"waveform_mode": self.waveform_mode,
"aer_codec": self.aer_codec,
"hashes": {
"waveform_bytes_sha256": self.waveform_bytes_sha256,
"aer_bytes_sha256": self.aer_bytes_sha256,
},
"waveform_metrics": dict(self.waveform_metrics),
"aer_metrics": dict(self.aer_metrics),
"telemetry": self.telemetry,
"qpu_artifact_sha256": self.qpu_artifact_sha256,
"optimiser_observation": (
None if self.optimiser_observation is None else dict(self.optimiser_observation)
),
"metadata": dict(self.metadata),
}
if include_packet_hash:
payload["packet_sha256"] = self.packet_sha256
return payload
|
packet_sha256
property
Stable hash of the JSON-compatible datastream payload.
to_bridge_dict(*, include_packet_hash=True)
Return the SCPN bridge packet mapping.
Source code in src/scpn_neurocore/datastream.py
| Python |
|---|
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | def to_bridge_dict(self, *, include_packet_hash: bool = True) -> dict[str, Any]:
"""Return the SCPN bridge packet mapping."""
payload: dict[str, Any] = {
"schema_version": SC_NEUROCORE_DATASTREAM_SCHEMA_VERSION,
"source_name": self.source_name,
"source_mode": self.source_mode,
"waveform_shape": list(self.waveform_shape),
"spike_shape": list(self.spike_shape),
"waveform_codec": self.waveform_codec,
"waveform_mode": self.waveform_mode,
"aer_codec": self.aer_codec,
"hashes": {
"waveform_bytes_sha256": self.waveform_bytes_sha256,
"aer_bytes_sha256": self.aer_bytes_sha256,
},
"waveform_metrics": dict(self.waveform_metrics),
"aer_metrics": dict(self.aer_metrics),
"telemetry": self.telemetry,
"qpu_artifact_sha256": self.qpu_artifact_sha256,
"optimiser_observation": (
None if self.optimiser_observation is None else dict(self.optimiser_observation)
),
"metadata": dict(self.metadata),
}
if include_packet_hash:
payload["packet_sha256"] = self.packet_sha256
return payload
|
build_datastream_packet(*, waveform, spike_raster, source_name, source_mode, layer_id='input', waveform_codec=None, aer_codec=None, qpu_artifact=None, optimiser_observation=None, metadata=None)
Build one audited datastream packet from raw waveform and AER spikes.
Source code in src/scpn_neurocore/datastream.py
| Python |
|---|
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 | def build_datastream_packet(
*,
waveform: np.ndarray,
spike_raster: np.ndarray,
source_name: str,
source_mode: str,
layer_id: str = "input",
waveform_codec: WaveformCodec | None = None,
aer_codec: AERSpikeCodec | None = None,
qpu_artifact: QPUBridgeArtifact | None = None,
optimiser_observation: BenchmarkObservation | None = None,
metadata: dict[str, Any] | None = None,
) -> SCNeuroCoreDatastreamPacket:
"""Build one audited datastream packet from raw waveform and AER spikes."""
if source_mode not in SOURCE_MODES:
raise DatastreamValidationError(f"unsupported source_mode {source_mode!r}")
waveform_array = _validate_waveform(waveform)
spikes = _validate_spike_raster(spike_raster)
if waveform_array.shape != spikes.shape:
raise DatastreamValidationError(
f"waveform shape {waveform_array.shape} must match spike_raster shape {spikes.shape}"
)
wave_codec = waveform_codec or WaveformCodec(mode="spike")
event_codec = aer_codec or AERSpikeCodec()
waveform_bytes, waveform_result = wave_codec.compress(waveform_array)
aer_bytes, aer_result = event_codec.compress(spikes)
telemetry = _telemetry_summary(spikes, layer_id=layer_id)
waveform_shape = (int(waveform_array.shape[0]), int(waveform_array.shape[1]))
spike_shape = (int(spikes.shape[0]), int(spikes.shape[1]))
return SCNeuroCoreDatastreamPacket(
source_name=source_name,
source_mode=source_mode,
waveform_shape=waveform_shape,
spike_shape=spike_shape,
waveform_codec=type(wave_codec).__name__,
waveform_mode=wave_codec.mode,
aer_codec=type(event_codec).__name__,
waveform_bytes_sha256=_hash_bytes(waveform_bytes),
aer_bytes_sha256=_hash_bytes(aer_bytes),
waveform_metrics=_waveform_metrics(waveform_result),
aer_metrics=_aer_metrics(aer_result),
telemetry=telemetry,
qpu_artifact_sha256=None if qpu_artifact is None else qpu_artifact.artifact_sha256,
optimiser_observation=(
None if optimiser_observation is None else _observation_record(optimiser_observation)
),
metadata={} if metadata is None else dict(metadata),
)
|
validate_datastream_payload(payload)
Validate the public shape of a bridge datastream payload.
Source code in src/scpn_neurocore/datastream.py
| Python |
|---|
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204 | def validate_datastream_payload(payload: dict[str, Any]) -> None:
"""Validate the public shape of a bridge datastream payload."""
if payload.get("schema_version") != SC_NEUROCORE_DATASTREAM_SCHEMA_VERSION:
raise DatastreamValidationError("unsupported datastream schema_version")
for key in ("source_name", "waveform_codec", "waveform_mode", "aer_codec"):
if not isinstance(payload.get(key), str) or not payload[key].strip():
raise DatastreamValidationError(f"{key} must be a non-empty string")
source_mode = payload.get("source_mode")
if source_mode not in SOURCE_MODES:
raise DatastreamValidationError("unsupported source_mode")
hashes = payload.get("hashes")
if not isinstance(hashes, dict):
raise DatastreamValidationError("hashes must be present")
for key in ("waveform_bytes_sha256", "aer_bytes_sha256"):
if not _is_sha256_hex(hashes.get(key)):
raise DatastreamValidationError(f"{key} must be a SHA256 hex digest")
waveform_shape = _shape_from_payload(payload, "waveform_shape")
spike_shape = _shape_from_payload(payload, "spike_shape")
if waveform_shape != spike_shape:
raise DatastreamValidationError("waveform_shape and spike_shape must match")
waveform_metrics = _mapping_from_payload(payload, "waveform_metrics")
aer_metrics = _mapping_from_payload(payload, "aer_metrics")
telemetry = _mapping_from_payload(payload, "telemetry")
total_ticks = _positive_int_from_mapping(telemetry, "total_ticks", "telemetry")
total_spikes = _nonnegative_int_from_mapping(telemetry, "total_spikes", "telemetry")
_validate_telemetry_layers(
telemetry,
expected_total_ticks=total_ticks,
expected_total_spikes=total_spikes,
)
if total_ticks != spike_shape[0]:
raise DatastreamValidationError("total_ticks must match spike_shape timesteps")
aer_spikes = _nonnegative_int_from_mapping(aer_metrics, "n_spikes", "aer_metrics")
if total_spikes != aer_spikes:
raise DatastreamValidationError("total_spikes must match aer_metrics n_spikes")
aer_timesteps = _positive_int_from_mapping(aer_metrics, "n_timesteps", "aer_metrics")
if aer_timesteps != spike_shape[0]:
raise DatastreamValidationError("n_timesteps must match spike_shape timesteps")
aer_neurons = _positive_int_from_mapping(aer_metrics, "n_neurons", "aer_metrics")
if aer_neurons != spike_shape[1]:
raise DatastreamValidationError("n_neurons must match spike_shape neurons")
waveform_samples = _positive_int_from_mapping(waveform_metrics, "n_samples", "waveform_metrics")
if waveform_samples != waveform_shape[0]:
raise DatastreamValidationError("n_samples must match waveform_shape samples")
waveform_channels = _positive_int_from_mapping(
waveform_metrics, "n_channels", "waveform_metrics"
)
if waveform_channels != waveform_shape[1]:
raise DatastreamValidationError("n_channels must match waveform_shape channels")
qpu_artifact_sha256 = payload.get("qpu_artifact_sha256")
if qpu_artifact_sha256 is not None and not _is_sha256_hex(qpu_artifact_sha256):
raise DatastreamValidationError("qpu_artifact_sha256 must be a SHA256 hex digest")
_validate_optimiser_observation(payload.get("optimiser_observation"))
if not isinstance(payload.get("metadata"), dict):
raise DatastreamValidationError("metadata must be a mapping")
packet_hash = payload.get("packet_sha256")
if not _is_sha256_hex(packet_hash):
raise DatastreamValidationError("packet_sha256 must be a SHA256 hex digest")
expected_hash = _payload_sha256_without_packet_hash(payload)
if packet_hash != expected_hash:
raise DatastreamValidationError("packet_sha256 does not match payload")
|