Skip to content

BCI Studio

Brain-computer interface closed-loop control. Real-time neural decoding, closed-loop stimulation, and charge density safety limits.

Quick Start

Python
from sc_neurocore.bci_studio.bci_primitives import BCIClosedLoopEngine
from sc_neurocore.bci_studio.bci_studio import BCIStudio
from sc_neurocore.interfaces import (
    build_bci_hil_reference_manifest,
    create_bci_hil_template,
)

HIL Reference Path

For deterministic hardware-in-the-loop prototyping, use the interface-layer template rather than the legacy studio loop:

Python
manifest = build_bci_hil_reference_manifest("pynq_shd")
template = create_bci_hil_template("pynq_shd")

The reference path wires raw probe-like waveform windows through WaveformCodec, AERSpikeCodec, rate decoding, feedback emission, and DeviceTelemetry. The default sink is an implant emulator; physical PYNQ feedback requires an explicit sink adapter and external bitstream artefacts.

Primitives

The primitive layer is the deterministic, auditable closed-loop path for research/HIL work:

Python
from sc_neurocore.bci_studio.bci_primitives import (
    BCIClosedLoopPrimitive,
    BCIFrame,
    BCIPrimitiveConfig,
)

primitive = BCIClosedLoopPrimitive(
    BCIPrimitiveConfig(
        channels=256,
        sampling_rate_hz=30_000,
        latency_budget_ms=10.0,
        command_threshold_hz=75.0,
    )
)
result = primitive.process_frame(BCIFrame(samples=window, reward=0.0, timestamp_us=1000))
packet = result.feedback_packet
trace = result.trace.as_dict()

The trace records schema version, frame id, input shape, spike count, active channels, score, command, latency, latency-budget status, adaptation status, and whether the optional native learning bridge was used. Feedback packets are fixed 24-byte little-endian records suitable for deterministic sink adapters.

Operational boundaries:

  • This is research/HIL infrastructure, not medical-device control software.
  • Physical feedback requires an explicit sink adapter and external safety case.
  • The default command is bounded by max_feedback_amplitude and reports when clipping was applied.
  • BCIClosedLoopEngine remains as a compatibility wrapper for older examples.

sc_neurocore.bci_studio.bci_primitives

Deterministic BCI closed-loop primitives for HIL prototyping.

This module provides bounded raw-signal processing, reward-modulated adaptation, feedback packetisation, and an audit trace for each frame. It is research/HIL infrastructure, not medical-device control software.

BCIPrimitiveConfig dataclass

Configuration for the deterministic closed-loop primitive.

Source code in src/sc_neurocore/bci_studio/bci_primitives.py
Python
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@dataclass(frozen=True)
class BCIPrimitiveConfig:
    """Configuration for the deterministic closed-loop primitive."""

    channels: int = 1024
    sampling_rate_hz: int = 30_000
    threshold_sigma: float = 4.5
    legacy_derivative_threshold: float = 0.5
    refractory_samples: int = 16
    command_threshold_hz: float = 75.0
    legacy_active_fraction_threshold: float = 0.10
    learning_rate: float = 0.01
    weight_decay: float = 0.999
    min_weight: float = 0.01
    max_weight: float = 10.0
    feedback_gain: float = 1.0
    max_feedback_amplitude: float = 1.0
    latency_budget_ms: float = 10.0
    enable_native_learning: bool = True

    def __post_init__(self) -> None:
        if self.channels <= 0:
            raise ValueError("channels must be positive")
        if self.sampling_rate_hz <= 0:
            raise ValueError("sampling_rate_hz must be positive")
        if self.threshold_sigma <= 0:
            raise ValueError("threshold_sigma must be positive")
        if self.legacy_derivative_threshold <= 0:
            raise ValueError("legacy_derivative_threshold must be positive")
        if self.refractory_samples < 1:
            raise ValueError("refractory_samples must be >= 1")
        if self.command_threshold_hz < 0:
            raise ValueError("command_threshold_hz must be non-negative")
        if not 0.0 <= self.legacy_active_fraction_threshold <= 1.0:
            raise ValueError("legacy_active_fraction_threshold must be in [0, 1]")
        if self.learning_rate < 0:
            raise ValueError("learning_rate must be non-negative")
        if not 0.0 < self.weight_decay <= 1.0:
            raise ValueError("weight_decay must be in (0, 1]")
        if not 0.0 < self.min_weight <= self.max_weight:
            raise ValueError("min_weight must be positive and <= max_weight")
        if self.max_feedback_amplitude <= 0:
            raise ValueError("max_feedback_amplitude must be positive")
        if self.latency_budget_ms <= 0:
            raise ValueError("latency_budget_ms must be positive")

BCIFrame dataclass

One raw neural signal frame.

Source code in src/sc_neurocore/bci_studio/bci_primitives.py
Python
88
89
90
91
92
93
94
95
96
@dataclass(frozen=True)
class BCIFrame:
    """One raw neural signal frame."""

    samples: np.ndarray[Any, Any]
    reward: float = 0.0
    timestamp_us: int = 0
    frame_id: int | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

BCIFeedbackCommand dataclass

Feedback command emitted by the primitive.

Packet layout is 24 bytes: [schema:u16, command:u8, flags:u8, channel:u16, reserved:u16, amplitude:f32, timestamp_us:u64, score:f32].

Source code in src/sc_neurocore/bci_studio/bci_primitives.py
Python
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@dataclass(frozen=True)
class BCIFeedbackCommand:
    """Feedback command emitted by the primitive.

    Packet layout is 24 bytes: `[schema:u16, command:u8, flags:u8,
    channel:u16, reserved:u16, amplitude:f32, timestamp_us:u64, score:f32]`.
    """

    COMMAND_NOP = 0
    COMMAND_STIM = 1

    command: int
    channel: int
    amplitude: float
    timestamp_us: int
    score: float
    safety_limited: bool = False

    def to_packet(self) -> bytes:
        flags = 1 if self.safety_limited else 0
        return struct.pack(
            "<HBBHHfQf",
            1,
            self.command,
            flags,
            self.channel,
            0,
            self.amplitude,
            self.timestamp_us,
            self.score,
        )

    @classmethod
    def from_packet(cls, packet: bytes) -> "BCIFeedbackCommand":
        if len(packet) < 24:
            raise ValueError("BCI feedback packet must be at least 24 bytes")
        schema, command, flags, channel, _reserved, amplitude, timestamp_us, score = struct.unpack(
            "<HBBHHfQf", packet[:24]
        )
        if schema != 1:
            raise ValueError(f"unsupported BCI feedback packet schema {schema}")
        return cls(
            command=command,
            channel=channel,
            amplitude=float(amplitude),
            timestamp_us=int(timestamp_us),
            score=float(score),
            safety_limited=bool(flags & 1),
        )

BCIClosedLoopTrace dataclass

Audit trace for one processed frame.

Source code in src/sc_neurocore/bci_studio/bci_primitives.py
Python
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@dataclass(frozen=True)
class BCIClosedLoopTrace:
    """Audit trace for one processed frame."""

    schema_version: str
    frame_id: int
    input_shape: tuple[int, ...]
    spike_count: int
    active_channels: int
    score: float
    command: int
    latency_ms: float
    latency_budget_ms: float
    latency_budget_met: bool
    adaptation_applied: bool
    ffi_accelerated: bool
    notes: tuple[str, ...] = ()

    def as_dict(self) -> dict[str, Any]:
        return {
            "schema_version": self.schema_version,
            "frame_id": self.frame_id,
            "input_shape": list(self.input_shape),
            "spike_count": self.spike_count,
            "active_channels": self.active_channels,
            "score": self.score,
            "command": self.command,
            "latency_ms": self.latency_ms,
            "latency_budget_ms": self.latency_budget_ms,
            "latency_budget_met": self.latency_budget_met,
            "adaptation_applied": self.adaptation_applied,
            "ffi_accelerated": self.ffi_accelerated,
            "notes": list(self.notes),
        }

BCIPrimitiveResult dataclass

Result from one closed-loop primitive step.

Source code in src/sc_neurocore/bci_studio/bci_primitives.py
Python
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@dataclass(frozen=True)
class BCIPrimitiveResult:
    """Result from one closed-loop primitive step."""

    command: BCIFeedbackCommand
    feedback_packet: bytes
    spikes: np.ndarray[Any, Any]
    channel_spike_counts: np.ndarray[Any, Any]
    score: float
    latency_ms: float
    trace: BCIClosedLoopTrace

    def as_legacy_dict(self) -> dict[str, Any]:
        return {
            "command": self.command.command,
            "latency_ms": self.latency_ms,
            "spikes": int(self.spikes.sum()),
            "active_channels": int(np.count_nonzero(self.channel_spike_counts)),
            "score": self.score,
            "feedback_bytes": len(self.feedback_packet),
            "latency_budget_met": self.trace.latency_budget_met,
            "trace": self.trace.as_dict(),
        }

BCIClosedLoopPrimitive

Deterministic raw-signal to feedback primitive with audit trace.

Source code in src/sc_neurocore/bci_studio/bci_primitives.py
Python
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
class BCIClosedLoopPrimitive:
    """Deterministic raw-signal to feedback primitive with audit trace."""

    def __init__(
        self,
        config: BCIPrimitiveConfig | None = None,
        *,
        initial_weights: np.ndarray[Any, Any] | None = None,
    ) -> None:
        self.config = config or BCIPrimitiveConfig()
        self.weights: npt.NDArray[np.float32]
        if initial_weights is None:
            self.weights = np.ones(self.config.channels, dtype=np.float32)
        else:
            weights = np.asarray(initial_weights, dtype=np.float32)
            if weights.shape != (self.config.channels,):
                raise ValueError(
                    f"initial_weights shape {weights.shape} does not match "
                    f"({self.config.channels},)"
                )
            self.weights = weights.copy()
        self._frame_counter = 0

        if FFI_ENABLED and self.config.enable_native_learning:
            self.layer: RustRuleLayer | None = RustRuleLayer(
                self.config.channels,
                RULE_ELIGENT,
                weight=1.0,
                param_a=self.config.learning_rate,
                param_b=1.0,
            )
        else:
            self.layer = None

    def process_frame(self, frame: BCIFrame) -> BCIPrimitiveResult:
        start_time = time.perf_counter()
        samples, notes = self._validate_samples(frame.samples)
        spikes, channel_counts = self._extract_spikes(samples)
        score = self._score(channel_counts, samples.shape[0])
        command = self._build_command(score, frame.timestamp_us)
        adaptation = self._adapt(channel_counts, command.command, frame.reward)
        latency_ms = (time.perf_counter() - start_time) * 1000.0
        frame_id = self._next_frame_id(frame.frame_id)

        trace = BCIClosedLoopTrace(
            schema_version=SCHEMA_VERSION,
            frame_id=frame_id,
            input_shape=tuple(int(v) for v in samples.shape),
            spike_count=int(spikes.sum()),
            active_channels=int(np.count_nonzero(channel_counts)),
            score=score,
            command=command.command,
            latency_ms=latency_ms,
            latency_budget_ms=self.config.latency_budget_ms,
            latency_budget_met=latency_ms <= self.config.latency_budget_ms,
            adaptation_applied=adaptation,
            ffi_accelerated=self.layer is not None,
            notes=tuple(notes),
        )
        return BCIPrimitiveResult(
            command=command,
            feedback_packet=command.to_packet(),
            spikes=spikes,
            channel_spike_counts=channel_counts,
            score=score,
            latency_ms=latency_ms,
            trace=trace,
        )

    def _next_frame_id(self, explicit: int | None) -> int:
        if explicit is not None:
            self._frame_counter = max(self._frame_counter, explicit + 1)
            return explicit
        frame_id = self._frame_counter
        self._frame_counter += 1
        return frame_id

    def _validate_samples(
        self, samples: np.ndarray[Any, Any]
    ) -> tuple[np.ndarray[Any, Any], list[str]]:
        data = np.asarray(samples, dtype=np.float32)
        if not np.all(np.isfinite(data)):
            raise ValueError("BCI frame contains non-finite values")
        notes: list[str] = []
        if data.ndim == 1:
            if data.shape[0] != self.config.channels:
                raise ValueError(
                    f"1D BCI frame has {data.shape[0]} channels, expected {self.config.channels}"
                )
            notes.append("legacy_vector_frame")
            return data.reshape(1, self.config.channels), notes
        if data.ndim != 2:
            raise ValueError("BCI frame samples must have shape (channels,) or (samples, channels)")
        if data.shape[0] == 0:
            raise ValueError("BCI frame must contain at least one sample")
        if data.shape[1] != self.config.channels:
            raise ValueError(
                f"BCI frame has {data.shape[1]} channels, expected {self.config.channels}"
            )
        return data, notes

    def _extract_spikes(
        self, samples: np.ndarray[Any, Any]
    ) -> tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]]:
        if samples.shape[0] == 1:
            vector = samples[0]
            diffs = np.abs(np.diff(vector, prepend=0.0))
            spikes = (diffs > self.config.legacy_derivative_threshold).reshape(
                1, self.config.channels
            )
            return spikes.astype(np.int8), spikes.sum(axis=0).astype(np.float32)

        noise_sigma = np.median(np.abs(samples), axis=0) / 0.6745
        noise_sigma = np.maximum(noise_sigma, 1e-6)
        thresholds = -self.config.threshold_sigma * noise_sigma
        spikes = np.zeros(samples.shape, dtype=np.int8)
        for channel in range(self.config.channels):
            last_spike = -self.config.refractory_samples - 1
            for sample_idx in range(1, samples.shape[0]):
                if (
                    samples[sample_idx, channel] < thresholds[channel]
                    and samples[sample_idx, channel] < samples[sample_idx - 1, channel]
                    and (sample_idx - last_spike) > self.config.refractory_samples
                ):
                    spikes[sample_idx, channel] = 1
                    last_spike = sample_idx
        return spikes, spikes.sum(axis=0).astype(np.float32)

    def _score(self, channel_counts: np.ndarray[Any, Any], n_samples: int) -> float:
        if n_samples <= 1:
            return float(np.dot(channel_counts, self.weights) / self.config.channels)
        duration_s = n_samples / self.config.sampling_rate_hz
        rates_hz = channel_counts / max(duration_s, 1e-9)
        return float(np.dot(rates_hz, self.weights) / self.config.channels)

    def _build_command(self, score: float, timestamp_us: int) -> BCIFeedbackCommand:
        if score <= 0.0:
            return BCIFeedbackCommand(
                command=BCIFeedbackCommand.COMMAND_NOP,
                channel=0,
                amplitude=0.0,
                timestamp_us=timestamp_us,
                score=score,
            )
        if score <= 1.0:
            threshold = self.config.legacy_active_fraction_threshold
            command = int(score > threshold)
            raw_amplitude = score / max(threshold, 1e-9) if command else 0.0
        else:
            command = int(score >= self.config.command_threshold_hz)
            raw_amplitude = score / max(self.config.command_threshold_hz, 1e-9) if command else 0.0
        amplitude = raw_amplitude * self.config.feedback_gain
        clipped = float(np.clip(amplitude, 0.0, self.config.max_feedback_amplitude))
        return BCIFeedbackCommand(
            command=command,
            channel=0,
            amplitude=clipped,
            timestamp_us=timestamp_us,
            score=score,
            safety_limited=amplitude != clipped,
        )

    def _adapt(self, channel_counts: np.ndarray[Any, Any], command: int, reward: float) -> bool:
        if reward == 0.0 and self.config.weight_decay == 1.0:
            return False
        spike_mask = channel_counts > 0
        if self.layer is not None:
            pre_spikes = spike_mask.astype(np.bool_)
            post_spikes = np.full(self.config.channels, command > 0, dtype=np.bool_)
            rewards = np.full(self.config.channels, reward, dtype=np.float32)
            self.layer.step(pre_spikes, post_spikes, rewards)
            self.weights = self.layer.get_weights().astype(np.float32)
            return True

        old = self.weights.copy()
        self.weights *= self.config.weight_decay
        if reward != 0.0:
            self.weights[spike_mask] += self.config.learning_rate * reward
            self.weights[~spike_mask] -= self.config.learning_rate * reward * 0.1
        self.weights = np.clip(self.weights, self.config.min_weight, self.config.max_weight)
        return bool(np.any(np.abs(self.weights - old) > 1e-9))

BCIClosedLoopEngine

Backward-compatible wrapper around :class:BCIClosedLoopPrimitive.

Source code in src/sc_neurocore/bci_studio/bci_primitives.py
Python
394
395
396
397
398
399
400
401
402
403
404
405
406
407
class BCIClosedLoopEngine:
    """Backward-compatible wrapper around :class:`BCIClosedLoopPrimitive`."""

    def __init__(self, channels: int = 1024):
        self.channels = channels
        self.primitive = BCIClosedLoopPrimitive(BCIPrimitiveConfig(channels=channels))

    @property
    def weights(self) -> np.ndarray[Any, Any]:
        return self.primitive.weights

    def process_bci_frame(self, raw_ephys: np.ndarray[Any, Any], reward: float) -> dict[str, Any]:
        result = self.primitive.process_frame(BCIFrame(samples=raw_ephys, reward=reward))
        return result.as_legacy_dict()

Studio

sc_neurocore.bci_studio.bci_studio

BCI Studio orchestrator for real-time closed-loop brain-computer interfaces.

Pipeline: raw_ephys → codec → spike_extract → SC_decode → learner → feedback Includes SC-domain lossy compression, online STDP learning, FPGA feedback serialization, and latency profiling.

SpikeCodec

SC-domain lossy compression for neural data streams.

Uses run-length encoding on spike trains with delta-time encoding.

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class SpikeCodec:
    """SC-domain lossy compression for neural data streams.

    Uses run-length encoding on spike trains with delta-time encoding.
    """

    def encode(self, spikes: np.ndarray) -> bytes:
        """Compress boolean spike array to RLE byte stream.

        Format: [total_len:u32_le] + N × [value:u8, count:u8]
        """
        if len(spikes) == 0:
            return b""
        runs: List[Tuple[int, int]] = []
        current = int(spikes[0])
        count = 1
        for i in range(1, len(spikes)):
            if int(spikes[i]) == current and count < 255:
                count += 1
            else:
                runs.append((current, count))
                current = int(spikes[i])
                count = 1
        runs.append((current, count))

        data = bytearray()
        data.extend(struct.pack("<I", len(spikes)))
        for val, cnt in runs:
            data.append(val & 0x01)
            data.append(cnt & 0xFF)
        return bytes(data)

    def decode(self, data: bytes) -> np.ndarray:
        """Decompress RLE byte stream back to spike array."""
        if len(data) < 4:
            return np.array([], dtype=np.uint8)
        total_len = struct.unpack("<I", data[:4])[0]
        spikes = []
        i = 4
        while i + 1 < len(data) and len(spikes) < total_len:
            val = data[i]
            cnt = data[i + 1]
            spikes.extend([val] * cnt)
            i += 2
        return np.array(spikes[:total_len], dtype=np.uint8)

    def compression_ratio(self, original: np.ndarray) -> float:
        """Return compression ratio (original_bytes / compressed_bytes)."""
        compressed = self.encode(original)
        if len(compressed) == 0:
            return 1.0
        return len(original) / len(compressed)

encode(spikes)

Compress boolean spike array to RLE byte stream.

Format: [total_len:u32_le] + N × [value:u8, count:u8]

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def encode(self, spikes: np.ndarray) -> bytes:
    """Compress boolean spike array to RLE byte stream.

    Format: [total_len:u32_le] + N × [value:u8, count:u8]
    """
    if len(spikes) == 0:
        return b""
    runs: List[Tuple[int, int]] = []
    current = int(spikes[0])
    count = 1
    for i in range(1, len(spikes)):
        if int(spikes[i]) == current and count < 255:
            count += 1
        else:
            runs.append((current, count))
            current = int(spikes[i])
            count = 1
    runs.append((current, count))

    data = bytearray()
    data.extend(struct.pack("<I", len(spikes)))
    for val, cnt in runs:
        data.append(val & 0x01)
        data.append(cnt & 0xFF)
    return bytes(data)

decode(data)

Decompress RLE byte stream back to spike array.

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def decode(self, data: bytes) -> np.ndarray:
    """Decompress RLE byte stream back to spike array."""
    if len(data) < 4:
        return np.array([], dtype=np.uint8)
    total_len = struct.unpack("<I", data[:4])[0]
    spikes = []
    i = 4
    while i + 1 < len(data) and len(spikes) < total_len:
        val = data[i]
        cnt = data[i + 1]
        spikes.extend([val] * cnt)
        i += 2
    return np.array(spikes[:total_len], dtype=np.uint8)

compression_ratio(original)

Return compression ratio (original_bytes / compressed_bytes).

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
109
110
111
112
113
114
def compression_ratio(self, original: np.ndarray) -> float:
    """Return compression ratio (original_bytes / compressed_bytes)."""
    compressed = self.encode(original)
    if len(compressed) == 0:
        return 1.0
    return len(original) / len(compressed)

OnlineLearner

Local STDP-inspired weight update rule (pure Python fallback).

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class OnlineLearner:
    """Local STDP-inspired weight update rule (pure Python fallback)."""

    def __init__(
        self,
        num_weights: int,
        lr: float = 0.01,
        decay: float = 0.999,
    ):
        self.weights = np.ones(num_weights, dtype=np.float32)
        self.lr = lr
        self.decay = decay
        self.updates = 0

    def step(
        self,
        spikes: np.ndarray,
        reward: float,
    ) -> np.ndarray:
        """Apply reward-modulated STDP update.

        Spikes that contributed to a positive reward get potentiated;
        non-spiking channels get depressed toward baseline.
        """
        self.weights *= self.decay

        spike_mask = spikes.astype(bool)
        self.weights[spike_mask] += self.lr * reward
        self.weights[~spike_mask] -= self.lr * reward * 0.1

        self.weights = np.clip(self.weights, 0.01, 10.0)
        self.updates += 1
        return self.weights

step(spikes, reward)

Apply reward-modulated STDP update.

Spikes that contributed to a positive reward get potentiated; non-spiking channels get depressed toward baseline.

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def step(
    self,
    spikes: np.ndarray,
    reward: float,
) -> np.ndarray:
    """Apply reward-modulated STDP update.

    Spikes that contributed to a positive reward get potentiated;
    non-spiking channels get depressed toward baseline.
    """
    self.weights *= self.decay

    spike_mask = spikes.astype(bool)
    self.weights[spike_mask] += self.lr * reward
    self.weights[~spike_mask] -= self.lr * reward * 0.1

    self.weights = np.clip(self.weights, 0.01, 10.0)
    self.updates += 1
    return self.weights

FPGAFeedbackController

Serializes BCI commands for DMA push to FPGA feedback register.

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class FPGAFeedbackController:
    """Serializes BCI commands for DMA push to FPGA feedback register."""

    COMMAND_NOP = 0
    COMMAND_STIM = 1
    COMMAND_INHIBIT = 2

    def serialize(
        self,
        command: int,
        channel: int = 0,
        amplitude: float = 1.0,
        timestamp_us: float = 0.0,
    ) -> bytes:
        """Pack a feedback command into a 16-byte DMA-aligned struct.

        Layout: [cmd:u8, chan:u16, amp:f32, ts:f64, pad:1]
        """
        return struct.pack("<BHfdx", command, channel, amplitude, timestamp_us)

    def deserialize(self, data: bytes) -> Dict:
        """Unpack a feedback command."""
        cmd, chan, amp, ts = struct.unpack("<BHfdx", data[:16])
        return {"command": cmd, "channel": chan, "amplitude": amp, "timestamp_us": ts}

serialize(command, channel=0, amplitude=1.0, timestamp_us=0.0)

Pack a feedback command into a 16-byte DMA-aligned struct.

Layout: [cmd:u8, chan:u16, amp:f32, ts:f64, pad:1]

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
159
160
161
162
163
164
165
166
167
168
169
170
def serialize(
    self,
    command: int,
    channel: int = 0,
    amplitude: float = 1.0,
    timestamp_us: float = 0.0,
) -> bytes:
    """Pack a feedback command into a 16-byte DMA-aligned struct.

    Layout: [cmd:u8, chan:u16, amp:f32, ts:f64, pad:1]
    """
    return struct.pack("<BHfdx", command, channel, amplitude, timestamp_us)

deserialize(data)

Unpack a feedback command.

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
172
173
174
175
def deserialize(self, data: bytes) -> Dict:
    """Unpack a feedback command."""
    cmd, chan, amp, ts = struct.unpack("<BHfdx", data[:16])
    return {"command": cmd, "channel": chan, "amplitude": amp, "timestamp_us": ts}

LatencyProfiler

Rolling window latency tracker with percentile reporting.

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class LatencyProfiler:
    """Rolling window latency tracker with percentile reporting."""

    def __init__(self, window_size: int = 1000):
        self.window: deque[float] = deque(maxlen=window_size)

    def record(self, latency_ms: float) -> None:
        self.window.append(latency_ms)

    @property
    def mean(self) -> float:
        return float(np.mean(list(self.window))) if self.window else 0.0

    @property
    def p50(self) -> float:
        return float(np.percentile(list(self.window), 50)) if self.window else 0.0

    @property
    def p95(self) -> float:
        return float(np.percentile(list(self.window), 95)) if self.window else 0.0

    @property
    def p99(self) -> float:
        return float(np.percentile(list(self.window), 99)) if self.window else 0.0

    @property
    def budget_met(self) -> bool:
        """True if p95 latency is under 10 ms BCI hard real-time target."""
        return self.p95 < 10.0

budget_met property

True if p95 latency is under 10 ms BCI hard real-time target.

BCIStudio

End-to-end BCI closed-loop orchestrator.

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class BCIStudio:
    """End-to-end BCI closed-loop orchestrator."""

    def __init__(
        self,
        channels: int = 1024,
        lr: float = 0.01,
    ):
        self.channels = channels
        self.codec = SpikeCodec()
        self.learner = OnlineLearner(channels, lr=lr)
        self.feedback = FPGAFeedbackController()
        self.profiler = LatencyProfiler()
        self.metrics = SessionMetrics()
        self._running = False

    def start_session(self) -> None:
        self._running = True
        self.metrics = SessionMetrics()

    def stop_session(self) -> SessionMetrics:
        self._running = False
        return self.metrics

    def process_frame(
        self,
        raw_ephys: np.ndarray,
        reward: float = 0.0,
    ) -> Dict:
        """Process a single BCI frame through the full pipeline."""
        t0 = time.perf_counter()

        # Spike extraction (threshold on diff)
        spikes = (np.abs(np.diff(raw_ephys, prepend=0)) > 0.5).astype(np.uint8)

        # Compression (for telemetry/logging)
        compressed = self.codec.encode(spikes)
        comp_ratio = len(raw_ephys) / max(1, len(compressed))

        # SC decode: weighted vote
        total_voltage = float(np.dot(spikes, self.learner.weights))

        # Online learning
        old_weights = self.learner.weights.copy()
        self.learner.step(spikes, reward)
        weight_delta = float(np.sum(np.abs(self.learner.weights - old_weights)))
        if weight_delta > 0.01 * self.channels:
            self.metrics.adaptation_events += 1

        # Command decision
        command = (
            FPGAFeedbackController.COMMAND_STIM
            if total_voltage > self.channels * 0.1
            else FPGAFeedbackController.COMMAND_NOP
        )

        # Feedback serialization
        feedback_packet = self.feedback.serialize(
            command, channel=0, amplitude=min(total_voltage / self.channels, 1.0)
        )

        latency_ms = (time.perf_counter() - t0) * 1000.0
        self.profiler.record(latency_ms)

        # Update session metrics
        n_spikes = int(np.sum(spikes))
        self.metrics.total_frames += 1
        self.metrics.total_spikes += n_spikes
        self.metrics.latency_history.append(latency_ms)

        return {
            "command": command,
            "latency_ms": latency_ms,
            "spikes": n_spikes,
            "compression_ratio": comp_ratio,
            "weight_delta": weight_delta,
            "feedback_bytes": len(feedback_packet),
        }

process_frame(raw_ephys, reward=0.0)

Process a single BCI frame through the full pipeline.

Source code in src/sc_neurocore/bci_studio/bci_studio.py
Python
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def process_frame(
    self,
    raw_ephys: np.ndarray,
    reward: float = 0.0,
) -> Dict:
    """Process a single BCI frame through the full pipeline."""
    t0 = time.perf_counter()

    # Spike extraction (threshold on diff)
    spikes = (np.abs(np.diff(raw_ephys, prepend=0)) > 0.5).astype(np.uint8)

    # Compression (for telemetry/logging)
    compressed = self.codec.encode(spikes)
    comp_ratio = len(raw_ephys) / max(1, len(compressed))

    # SC decode: weighted vote
    total_voltage = float(np.dot(spikes, self.learner.weights))

    # Online learning
    old_weights = self.learner.weights.copy()
    self.learner.step(spikes, reward)
    weight_delta = float(np.sum(np.abs(self.learner.weights - old_weights)))
    if weight_delta > 0.01 * self.channels:
        self.metrics.adaptation_events += 1

    # Command decision
    command = (
        FPGAFeedbackController.COMMAND_STIM
        if total_voltage > self.channels * 0.1
        else FPGAFeedbackController.COMMAND_NOP
    )

    # Feedback serialization
    feedback_packet = self.feedback.serialize(
        command, channel=0, amplitude=min(total_voltage / self.channels, 1.0)
    )

    latency_ms = (time.perf_counter() - t0) * 1000.0
    self.profiler.record(latency_ms)

    # Update session metrics
    n_spikes = int(np.sum(spikes))
    self.metrics.total_frames += 1
    self.metrics.total_spikes += n_spikes
    self.metrics.latency_history.append(latency_ms)

    return {
        "command": command,
        "latency_ms": latency_ms,
        "spikes": n_spikes,
        "compression_ratio": comp_ratio,
        "weight_delta": weight_delta,
        "feedback_bytes": len(feedback_packet),
    }