Skip to content

Spike Codec Library

Six codecs for neural data compression — BCI telemetry, neural probes, neuromorphic routing, real-time streaming, and general-purpose archival.

All codecs share compress(spikes) → (bytes, result) and decompress(bytes, T, N) → spikes.

Registry

sc_neurocore.spike_codec.registry

Codec registry: lookup by name, recommend by data characteristics.

Six codecs for different use cases:

isi         — Baseline ISI + varint. Simple, general-purpose.
predictive  — EMA predictor + XOR errors. Best for BCI implants.
delta       — Inter-channel XOR. Best for correlated probe arrays.
streaming   — Fixed-latency frames. Best for real-time decoding.
aer         — Event list. Best for neuromorphic inter-chip routing.

All share the same API: compress(spikes) → (bytes, result), decompress(bytes, T, N) → spikes.

get_codec(name, **kwargs)

Get a codec by name.

Parameters

name : str One of: 'isi', 'predictive', 'delta', 'streaming', 'aer'. **kwargs Passed to the codec constructor.

Returns

Codec instance with compress/decompress methods.

Source code in src/sc_neurocore/spike_codec/registry.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def get_codec(name: str, **kwargs):
    """Get a codec by name.

    Parameters
    ----------
    name : str
        One of: 'isi', 'predictive', 'delta', 'streaming', 'aer'.
    **kwargs
        Passed to the codec constructor.

    Returns
    -------
    Codec instance with compress/decompress methods.
    """
    cls = CODEC_REGISTRY.get(name)
    if cls is None:
        available = ", ".join(sorted(CODEC_REGISTRY))
        raise ValueError(f"Unknown codec {name!r}. Available: {available}")
    return cls(**kwargs)

list_codecs()

List available codec names.

Source code in src/sc_neurocore/spike_codec/registry.py
62
63
64
def list_codecs() -> list[str]:
    """List available codec names."""
    return sorted(CODEC_REGISTRY)

recommend_codec(n_channels, firing_rate, latency_ms=10.0, correlated=False, neuromorphic=False)

Recommend a codec based on data characteristics.

Parameters

n_channels : int Number of recording channels. firing_rate : float Mean firing rate in Hz (per neuron). latency_ms : float Maximum acceptable latency in milliseconds. correlated : bool True if nearby channels are spatially correlated. neuromorphic : bool True if target is neuromorphic hardware (Loihi, SpiNNaker).

Returns

str — codec name

Source code in src/sc_neurocore/spike_codec/registry.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def recommend_codec(
    n_channels: int,
    firing_rate: float,
    latency_ms: float = 10.0,
    correlated: bool = False,
    neuromorphic: bool = False,
) -> str:
    """Recommend a codec based on data characteristics.

    Parameters
    ----------
    n_channels : int
        Number of recording channels.
    firing_rate : float
        Mean firing rate in Hz (per neuron).
    latency_ms : float
        Maximum acceptable latency in milliseconds.
    correlated : bool
        True if nearby channels are spatially correlated.
    neuromorphic : bool
        True if target is neuromorphic hardware (Loihi, SpiNNaker).

    Returns
    -------
    str — codec name
    """
    if neuromorphic:
        return "aer"

    if latency_ms <= 1.0:
        return "streaming"

    if correlated and n_channels >= 16:
        return "delta"

    # Predictive works best when temporal structure exists
    # (periodic bursting, oscillations, drift)
    if n_channels >= 64:
        return "predictive"

    return "isi"

ISI Codec (Baseline)

Inter-spike interval encoding with LEB128 variable-length integers. 50-200x compression on typical cortical firing rates.

sc_neurocore.spike_codec.codec

ISI spike train compression with configurable entropy backend.

Per-neuron inter-spike interval encoding. Two backends: 'varint' (default): LEB128 variable-length integers. Simple, fast. 'huffman': Adaptive Huffman coding on ISI distribution. 30-60% smaller than varint on medium-to-dense data because frequent short ISIs get 2-4 bit codes.

For better compression on structured data, see PredictiveSpikeCodec (temporal prediction), DeltaSpikeCodec (inter-channel correlation), or AERSpikeCodec (event encoding).

SpikeCodec

ISI spike train codec with configurable entropy backend.

Compression strategy: 1. Extract per-neuron spike times from binary raster 2. Compute inter-spike intervals (ISIs) per neuron 3. Encode ISIs with chosen backend: 'varint': LEB128 variable-length integers (fast, simple) 'huffman': Adaptive Huffman (30-60% smaller on dense data)

Each neuron is encoded independently. No inter-channel modeling. For inter-channel compression, use DeltaSpikeCodec.

Parameters

mode : str 'lossless' (exact reconstruction) or 'lossy' (preserve rates only). timing_precision : int For lossy mode: quantize spike times to this resolution. entropy : str 'varint' (default) or 'huffman'.

Source code in src/sc_neurocore/spike_codec/codec.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class SpikeCodec:
    """ISI spike train codec with configurable entropy backend.

    Compression strategy:
    1. Extract per-neuron spike times from binary raster
    2. Compute inter-spike intervals (ISIs) per neuron
    3. Encode ISIs with chosen backend:
       'varint': LEB128 variable-length integers (fast, simple)
       'huffman': Adaptive Huffman (30-60% smaller on dense data)

    Each neuron is encoded independently. No inter-channel modeling.
    For inter-channel compression, use DeltaSpikeCodec.

    Parameters
    ----------
    mode : str
        'lossless' (exact reconstruction) or 'lossy' (preserve rates only).
    timing_precision : int
        For lossy mode: quantize spike times to this resolution.
    entropy : str
        'varint' (default) or 'huffman'.
    """

    def __init__(self, mode: str = "lossless", timing_precision: int = 1, entropy: str = "auto"):
        self.mode = mode
        self.timing_precision = timing_precision
        self.entropy = entropy
        self._huffman = HuffmanEncoder()

    def compress(self, spikes: np.ndarray) -> tuple[bytes, CompressionResult]:
        """Compress a spike raster.

        Parameters
        ----------
        spikes : ndarray of shape (T, N), binary (int8 or bool)

        Returns
        -------
        (compressed_bytes, CompressionResult)
        """
        T, N = spikes.shape
        original_bits = T * N

        if self.mode == "lossy":
            spikes = self._quantize_timing(spikes)

        # Extract per-neuron spike times
        events = []
        for n in range(N):
            times = np.where(spikes[:, n] > 0)[0]
            events.append(times)

        # Encode: ISIs per neuron + variable-length integers
        encoded = self._encode_events(events, T, N)

        compressed_bits = len(encoded) * 8
        ratio = original_bits / max(compressed_bits, 1)
        n_spikes = sum(len(e) for e in events)

        result = CompressionResult(
            original_bits=original_bits,
            compressed_bits=compressed_bits,
            compression_ratio=ratio,
            n_spikes=n_spikes,
            n_neurons=N,
            n_timesteps=T,
            lossless=self.mode == "lossless",
        )
        return encoded, result

    def decompress(self, data: bytes, T: int, N: int) -> np.ndarray:
        """Decompress to spike raster.

        Parameters
        ----------
        data : bytes
        T, N : int
            Original dimensions.

        Returns
        -------
        ndarray of shape (T, N), int8
        """
        events = self._decode_events(data, N)
        spikes = np.zeros((T, N), dtype=np.int8)
        for n, times in enumerate(events):
            for t in times:
                if 0 <= t < T:
                    spikes[t, n] = 1
        return spikes

    def _quantize_timing(self, spikes: np.ndarray) -> np.ndarray:
        if self.timing_precision <= 1:  # pragma: no cover
            return spikes
        T, N = spikes.shape
        new_T = T // self.timing_precision
        quantized = np.zeros((new_T, N), dtype=np.int8)
        for i in range(new_T):
            block = spikes[i * self.timing_precision : (i + 1) * self.timing_precision]
            quantized[i] = (block.sum(axis=0) > 0).astype(np.int8)
        return quantized

    def _pick_entropy(self, n_spikes: int, total_bins: int) -> str:
        """Auto-select entropy backend based on data density."""
        if self.entropy in ("varint", "huffman"):
            return self.entropy
        # auto: huffman for dense data (>3% spikes), varint for sparse
        density = n_spikes / max(total_bins, 1)
        return "huffman" if density > 0.03 else "varint"

    def _encode_events(self, events: list[np.ndarray], T: int, N: int) -> bytes:
        """Encode spike events using ISI + auto-selected entropy backend."""
        n_spikes = sum(len(e) for e in events)
        backend = self._pick_entropy(n_spikes, T * N)
        if backend == "huffman":
            return self._encode_events_huffman(events, T, N)

        parts = []
        # Header: T, N as 4-byte big-endian + entropy flag
        parts.append(T.to_bytes(4, "big"))
        parts.append(N.to_bytes(4, "big"))

        for times in events:
            n_spikes = len(times)
            parts.append(self._encode_varint(n_spikes))

            if n_spikes == 0:
                continue

            parts.append(self._encode_varint(int(times[0])))

            for i in range(1, n_spikes):
                isi = int(times[i] - times[i - 1])
                parts.append(self._encode_varint(isi))

        return b"".join(parts)

    def _encode_events_huffman(self, events: list[np.ndarray], T: int, N: int) -> bytes:
        """Encode events using Huffman-coded ISIs."""
        # Collect all ISI values first (for building Huffman table)
        all_isis = []
        spike_counts = []
        first_times = []

        for times in events:
            n_spikes = len(times)
            spike_counts.append(n_spikes)
            if n_spikes == 0:
                continue
            first_times.append(int(times[0]))
            for i in range(1, n_spikes):
                all_isis.append(int(times[i] - times[i - 1]))

        # Header: magic(1) + T(4) + N(4)
        header = b"\x01"  # entropy=huffman flag
        header += T.to_bytes(4, "big") + N.to_bytes(4, "big")

        # Spike counts + first times as varint (small overhead)
        count_parts = []
        for n_spikes in spike_counts:
            count_parts.append(self._encode_varint(n_spikes))
        first_parts = []
        for ft in first_times:
            first_parts.append(self._encode_varint(ft))

        count_data = b"".join(count_parts)
        first_data = b"".join(first_parts)

        # Huffman-encode all ISIs as one stream
        assert self._huffman is not None
        huff_data = self._huffman.encode(all_isis)

        # Pack: header + count_data_len(4) + count_data + first_data_len(4) + first_data + huff_data
        import struct

        return (
            header
            + struct.pack("!I", len(count_data))
            + count_data
            + struct.pack("!I", len(first_data))
            + first_data
            + huff_data
        )

    def _decode_events(self, data: bytes, N: int) -> list[np.ndarray]:
        """Decode ISI-encoded spike events (auto-detects entropy backend)."""
        if data[0:1] == b"\x01":
            return self._decode_events_huffman(data, N)

        pos = 0
        pos += 8  # skip header (T, N)

        events = []
        for n in range(N):
            n_spikes, pos = self._decode_varint(data, pos)
            if n_spikes == 0:
                events.append(np.array([], dtype=np.int64))
                continue

            times = np.zeros(n_spikes, dtype=np.int64)
            first, pos = self._decode_varint(data, pos)
            times[0] = first

            for i in range(1, n_spikes):
                isi, pos = self._decode_varint(data, pos)
                times[i] = times[i - 1] + isi

            events.append(times)
        return events

    def _decode_events_huffman(self, data: bytes, N: int) -> list[np.ndarray]:
        """Decode Huffman-coded ISI events."""
        import struct

        pos = 1  # skip magic byte
        pos += 8  # skip T, N (already known from outer header)

        # Read spike counts
        count_len = struct.unpack("!I", data[pos : pos + 4])[0]
        pos += 4
        count_data = data[pos : pos + count_len]
        pos += count_len

        spike_counts = []
        cpos = 0
        for _ in range(N):
            n, cpos = self._decode_varint(count_data, cpos)
            spike_counts.append(n)

        # Read first times
        first_len = struct.unpack("!I", data[pos : pos + 4])[0]
        pos += 4
        first_data = data[pos : pos + first_len]
        pos += first_len

        first_times = []
        fpos = 0
        for sc in spike_counts:
            if sc > 0:
                ft, fpos = self._decode_varint(first_data, fpos)
                first_times.append(ft)

        # Decode Huffman ISIs
        total_isis = sum(max(0, sc - 1) for sc in spike_counts)
        huff = HuffmanEncoder()
        isis, _ = huff.decode(data[pos:], total_isis)

        # Reconstruct events
        events = []
        isi_idx = 0
        ft_idx = 0
        for sc in spike_counts:
            if sc == 0:
                events.append(np.array([], dtype=np.int64))
                continue
            times = np.zeros(sc, dtype=np.int64)
            times[0] = first_times[ft_idx]
            ft_idx += 1
            for i in range(1, sc):
                times[i] = times[i - 1] + isis[isi_idx]
                isi_idx += 1
            events.append(times)
        return events

    @staticmethod
    def _encode_varint(value: int) -> bytes:
        """Encode integer using variable-length encoding (LEB128-style)."""
        result = bytearray()
        while value >= 0x80:
            result.append((value & 0x7F) | 0x80)
            value >>= 7
        result.append(value & 0x7F)
        return bytes(result)

    @staticmethod
    def _decode_varint(data: bytes, pos: int) -> tuple[int, int]:
        """Decode variable-length integer, return (value, new_position)."""
        value = 0
        shift = 0
        while pos < len(data):
            byte = data[pos]
            pos += 1
            value |= (byte & 0x7F) << shift
            if not (byte & 0x80):
                break
            shift += 7
        return value, pos

compress(spikes)

Compress a spike raster.

Parameters

spikes : ndarray of shape (T, N), binary (int8 or bool)

Returns

(compressed_bytes, CompressionResult)

Source code in src/sc_neurocore/spike_codec/codec.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def compress(self, spikes: np.ndarray) -> tuple[bytes, CompressionResult]:
    """Compress a spike raster.

    Parameters
    ----------
    spikes : ndarray of shape (T, N), binary (int8 or bool)

    Returns
    -------
    (compressed_bytes, CompressionResult)
    """
    T, N = spikes.shape
    original_bits = T * N

    if self.mode == "lossy":
        spikes = self._quantize_timing(spikes)

    # Extract per-neuron spike times
    events = []
    for n in range(N):
        times = np.where(spikes[:, n] > 0)[0]
        events.append(times)

    # Encode: ISIs per neuron + variable-length integers
    encoded = self._encode_events(events, T, N)

    compressed_bits = len(encoded) * 8
    ratio = original_bits / max(compressed_bits, 1)
    n_spikes = sum(len(e) for e in events)

    result = CompressionResult(
        original_bits=original_bits,
        compressed_bits=compressed_bits,
        compression_ratio=ratio,
        n_spikes=n_spikes,
        n_neurons=N,
        n_timesteps=T,
        lossless=self.mode == "lossless",
    )
    return encoded, result

decompress(data, T, N)

Decompress to spike raster.

Parameters

data : bytes T, N : int Original dimensions.

Returns

ndarray of shape (T, N), int8

Source code in src/sc_neurocore/spike_codec/codec.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def decompress(self, data: bytes, T: int, N: int) -> np.ndarray:
    """Decompress to spike raster.

    Parameters
    ----------
    data : bytes
    T, N : int
        Original dimensions.

    Returns
    -------
    ndarray of shape (T, N), int8
    """
    events = self._decode_events(data, N)
    spikes = np.zeros((T, N), dtype=np.int8)
    for n, times in enumerate(events):
        for t in times:
            if 0 <= t < T:
                spikes[t, n] = 1
    return spikes

CompressionResult dataclass

Result of spike train compression.

Source code in src/sc_neurocore/spike_codec/codec.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@dataclass
class CompressionResult:
    """Result of spike train compression."""

    original_bits: int
    compressed_bits: int
    compression_ratio: float
    n_spikes: int
    n_neurons: int
    n_timesteps: int
    lossless: bool

    def summary(self) -> str:
        mode = "lossless" if self.lossless else "lossy"
        return (
            f"SpikeCodec ({mode}): {self.compression_ratio:.1f}x compression, "
            f"{self.original_bits} -> {self.compressed_bits} bits, "
            f"{self.n_spikes} spikes across {self.n_neurons} neurons x {self.n_timesteps} steps"
        )

Predictive Codec (BCI Implants)

EMA predictor + XOR error coding. Only transmit surprises. Encoder and decoder share identical deterministic predictor state.

sc_neurocore.spike_codec.predictive_codec

Predictive spike compression: XOR-based prediction error coding.

Architecture
  1. Maintain per-channel firing rate predictor (exponential moving average)
  2. At each timestep, predict spike pattern from learned rates
  3. XOR actual vs predicted → prediction error (surprise) bits
  4. ISI-compress only the error bits (sparser than raw spikes)
  5. Decoder runs identical predictor → lossless reconstruction

The predictor is deterministic given the same seed, so encoder and decoder stay synchronized without transmitting predictor state.

Neuralink context: 1024+ channels at 20 kHz produce ~200 Mbps raw. Typical cortical neurons fire at 0.5-5 Hz → >99.9% of bits are zeros. ISI coding alone gives 50-200x. Predictive coding removes the remaining structured correlations (bursts, oscillations, drift) for additional 2-5x on top of ISI baseline.

PredictiveSpikeCodec

Predictive spike codec: compress prediction errors, not raw spikes.

Four predictor modes

'ema' (default): float EMA rate tracking + threshold comparison. 'lfsr': Q8.8 fixed-point rate + LFSR comparator. Bit-true with sc_bitstream_encoder.v — maps directly to Verilog RTL. 'context': Markov context predictor. Hashes last K spike states per channel, predicts from accumulated statistics. 'world_model': Learnable autoregressive predictor (LMS-trained). Predicts spike[t] from spike[t-K:t] via linear model with sigmoid activation. Learns cross-channel correlations.

Compression pipeline
  1. For each timestep t: a. predicted[t] = predictor.predict() b. error[t] = actual[t] XOR predicted[t] c. predictor.update(actual[t])
  2. ISI-compress the error matrix (sparser than raw spikes)
  3. Pack with header (predictor params for decoder sync)
Parameters

alpha : float EMA smoothing factor (ema mode). Ignored in lfsr/context mode. threshold : float Spike prediction threshold (ema mode). Ignored in lfsr/context mode. predictor : str 'ema', 'lfsr', or 'context'. alpha_q8 : int Q8.8 smoothing factor for lfsr mode. 1 = 1/256 ≈ 0.004. seed : int LFSR seed for lfsr mode (non-zero, 16-bit). context_bits : int Context history length for context mode (default 8 = last 8 spikes). base_mode : str 'lossless' or 'lossy' for the underlying ISI codec. timing_precision : int For lossy mode: quantize timing resolution.

Source code in src/sc_neurocore/spike_codec/predictive_codec.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
class PredictiveSpikeCodec:
    """Predictive spike codec: compress prediction errors, not raw spikes.

    Four predictor modes:
        'ema' (default): float EMA rate tracking + threshold comparison.
        'lfsr': Q8.8 fixed-point rate + LFSR comparator. Bit-true with
                sc_bitstream_encoder.v — maps directly to Verilog RTL.
        'context': Markov context predictor. Hashes last K spike states
                   per channel, predicts from accumulated statistics.
        'world_model': Learnable autoregressive predictor (LMS-trained).
                   Predicts spike[t] from spike[t-K:t] via linear model
                   with sigmoid activation. Learns cross-channel correlations.

    Compression pipeline:
        1. For each timestep t:
           a. predicted[t] = predictor.predict()
           b. error[t] = actual[t] XOR predicted[t]
           c. predictor.update(actual[t])
        2. ISI-compress the error matrix (sparser than raw spikes)
        3. Pack with header (predictor params for decoder sync)

    Parameters
    ----------
    alpha : float
        EMA smoothing factor (ema mode). Ignored in lfsr/context mode.
    threshold : float
        Spike prediction threshold (ema mode). Ignored in lfsr/context mode.
    predictor : str
        'ema', 'lfsr', or 'context'.
    alpha_q8 : int
        Q8.8 smoothing factor for lfsr mode. 1 = 1/256 ≈ 0.004.
    seed : int
        LFSR seed for lfsr mode (non-zero, 16-bit).
    context_bits : int
        Context history length for context mode (default 8 = last 8 spikes).
    base_mode : str
        'lossless' or 'lossy' for the underlying ISI codec.
    timing_precision : int
        For lossy mode: quantize timing resolution.
    """

    HEADER_MAGIC = b"PSCX"  # Predictive Spike Codec XOR (EMA)
    HEADER_MAGIC_LFSR = b"PSCL"  # Predictive Spike Codec LFSR
    HEADER_MAGIC_CTX = b"PSCC"  # Predictive Spike Codec Context
    HEADER_MAGIC_WM = b"PSCW"  # Predictive Spike Codec World Model

    def __init__(
        self,
        alpha: float = 0.005,
        threshold: float = 0.5,
        predictor: str = "ema",
        alpha_q8: int = 1,
        seed: int = 0xACE1,
        context_bits: int = 8,
        base_mode: str = "lossless",
        timing_precision: int = 1,
    ):
        self.alpha = alpha
        self.threshold = threshold
        self.predictor = predictor
        self.alpha_q8 = alpha_q8
        self.seed = seed
        self.context_bits = context_bits
        # Context predictor benefits from Huffman backend (sparse scattered errors)
        entropy = "huffman" if predictor in ("context", "world_model") else "auto"
        self.base_codec = SpikeCodec(
            mode=base_mode,
            timing_precision=timing_precision,
            entropy=entropy,
        )

    def compress(self, spikes: np.ndarray) -> tuple[bytes, PredictiveCompressionResult]:
        """Compress spike raster using predictive error coding.

        Parameters
        ----------
        spikes : ndarray of shape (T, N), binary (int8 or bool)

        Returns
        -------
        (compressed_bytes, PredictiveCompressionResult)
        """
        import struct

        spikes = np.asarray(spikes, dtype=np.int8)
        T, N = spikes.shape
        original_bits = T * N

        if self.predictor == "world_model":
            errors, correct_predictions = predict_and_xor_world_model(
                spikes,
                N,
                history_len=self.context_bits,
                lr=self.alpha,
                threshold=self.threshold,
                seed=self.seed,
            )
            error_data, _ = self.base_codec.compress(errors)
            header = self.HEADER_MAGIC_WM + struct.pack(
                "!BdH",
                self.context_bits,
                self.alpha,
                self.seed,
            )
        elif self.predictor == "context":
            errors, correct_predictions = _predict_and_xor_context(
                spikes,
                N,
                self.context_bits,
            )
            error_data, _ = self.base_codec.compress(errors)
            header = self.HEADER_MAGIC_CTX + struct.pack("!B", self.context_bits)
        elif self.predictor == "lfsr":
            if _HAS_RUST:  # pragma: no cover
                flat = np.ascontiguousarray(spikes).ravel()
                err_flat, correct_predictions = _rust_predict_lfsr(
                    flat,
                    N,
                    self.alpha_q8,
                    self.seed,
                )
                errors = np.asarray(err_flat).reshape(T, N)
            else:
                errors, correct_predictions = _predict_and_xor_lfsr(
                    spikes,
                    N,
                    self.alpha_q8,
                    self.seed,
                )
            error_data, _ = self.base_codec.compress(errors)
            header = self.HEADER_MAGIC_LFSR + struct.pack("!HH", self.alpha_q8, self.seed)
        else:
            if _HAS_RUST:  # pragma: no cover
                flat = np.ascontiguousarray(spikes).ravel()
                err_flat, correct_predictions = _rust_predict_ema(
                    flat,
                    N,
                    self.alpha,
                    self.threshold,
                )
                errors = np.asarray(err_flat).reshape(T, N)
            else:
                errors, correct_predictions = _predict_and_xor(
                    spikes,
                    N,
                    self.alpha,
                    self.threshold,
                )
            error_data, _ = self.base_codec.compress(errors)
            header = self.HEADER_MAGIC + struct.pack("!dd", self.alpha, self.threshold)

        encoded = header + error_data
        compressed_bits = len(encoded) * 8
        ratio = original_bits / max(compressed_bits, 1)

        return encoded, PredictiveCompressionResult(
            original_bits=original_bits,
            compressed_bits=compressed_bits,
            compression_ratio=ratio,
            n_spikes=int(np.sum(spikes)),
            n_neurons=N,
            n_timesteps=T,
            lossless=self.base_codec.mode == "lossless",
            prediction_accuracy=correct_predictions / max(T * N, 1),
            error_sparsity=1.0 - (int(np.sum(errors)) / max(T * N, 1)),
            predictor_type=self.predictor,
        )

    def decompress(self, data: bytes, T: int, N: int) -> np.ndarray:
        """Decompress to spike raster.

        Runs identical predictor on decoder side. XOR(error, predicted)
        recovers original spikes. Predictor type auto-detected from header.

        Parameters
        ----------
        data : bytes
            Compressed data from compress().
        T, N : int
            Original dimensions.

        Returns
        -------
        ndarray of shape (T, N), int8
        """
        import struct

        magic = data[:4]

        if magic == self.HEADER_MAGIC_WM:
            history_len = data[4]
            alpha, seed = struct.unpack("!dH", data[5:15])
            error_data = data[15:]
            errors = self.base_codec.decompress(error_data, T, N)
            return xor_and_recover_world_model(
                errors,
                N,
                history_len=history_len,
                lr=alpha,
                seed=seed,
            )

        if magic == self.HEADER_MAGIC_CTX:
            context_bits = data[4]
            error_data = data[5:]
            errors = self.base_codec.decompress(error_data, T, N)
            return _xor_and_recover_context(errors, N, context_bits)

        if magic == self.HEADER_MAGIC_LFSR:
            alpha_q8, seed = struct.unpack("!HH", data[4:8])
            error_data = data[8:]
            errors = self.base_codec.decompress(error_data, T, N)
            if _HAS_RUST:  # pragma: no cover
                flat = np.ascontiguousarray(errors).ravel()
                rec = np.asarray(_rust_recover_lfsr(flat, N, alpha_q8, seed))
                return rec.reshape(T, N)
            return _xor_and_recover_lfsr(errors, N, alpha_q8, seed)

        if magic == self.HEADER_MAGIC:
            alpha, threshold = struct.unpack("!dd", data[4:20])
            error_data = data[20:]
            errors = self.base_codec.decompress(error_data, T, N)
            if _HAS_RUST:  # pragma: no cover
                flat = np.ascontiguousarray(errors).ravel()
                rec = np.asarray(_rust_recover_ema(flat, N, alpha, threshold))
                return rec.reshape(T, N)
            return _xor_and_recover(errors, N, alpha, threshold)

        raise ValueError(
            f"Invalid header magic: {magic!r}, expected {self.HEADER_MAGIC!r} or {self.HEADER_MAGIC_LFSR!r}"
        )

compress(spikes)

Compress spike raster using predictive error coding.

Parameters

spikes : ndarray of shape (T, N), binary (int8 or bool)

Returns

(compressed_bytes, PredictiveCompressionResult)

Source code in src/sc_neurocore/spike_codec/predictive_codec.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def compress(self, spikes: np.ndarray) -> tuple[bytes, PredictiveCompressionResult]:
    """Compress spike raster using predictive error coding.

    Parameters
    ----------
    spikes : ndarray of shape (T, N), binary (int8 or bool)

    Returns
    -------
    (compressed_bytes, PredictiveCompressionResult)
    """
    import struct

    spikes = np.asarray(spikes, dtype=np.int8)
    T, N = spikes.shape
    original_bits = T * N

    if self.predictor == "world_model":
        errors, correct_predictions = predict_and_xor_world_model(
            spikes,
            N,
            history_len=self.context_bits,
            lr=self.alpha,
            threshold=self.threshold,
            seed=self.seed,
        )
        error_data, _ = self.base_codec.compress(errors)
        header = self.HEADER_MAGIC_WM + struct.pack(
            "!BdH",
            self.context_bits,
            self.alpha,
            self.seed,
        )
    elif self.predictor == "context":
        errors, correct_predictions = _predict_and_xor_context(
            spikes,
            N,
            self.context_bits,
        )
        error_data, _ = self.base_codec.compress(errors)
        header = self.HEADER_MAGIC_CTX + struct.pack("!B", self.context_bits)
    elif self.predictor == "lfsr":
        if _HAS_RUST:  # pragma: no cover
            flat = np.ascontiguousarray(spikes).ravel()
            err_flat, correct_predictions = _rust_predict_lfsr(
                flat,
                N,
                self.alpha_q8,
                self.seed,
            )
            errors = np.asarray(err_flat).reshape(T, N)
        else:
            errors, correct_predictions = _predict_and_xor_lfsr(
                spikes,
                N,
                self.alpha_q8,
                self.seed,
            )
        error_data, _ = self.base_codec.compress(errors)
        header = self.HEADER_MAGIC_LFSR + struct.pack("!HH", self.alpha_q8, self.seed)
    else:
        if _HAS_RUST:  # pragma: no cover
            flat = np.ascontiguousarray(spikes).ravel()
            err_flat, correct_predictions = _rust_predict_ema(
                flat,
                N,
                self.alpha,
                self.threshold,
            )
            errors = np.asarray(err_flat).reshape(T, N)
        else:
            errors, correct_predictions = _predict_and_xor(
                spikes,
                N,
                self.alpha,
                self.threshold,
            )
        error_data, _ = self.base_codec.compress(errors)
        header = self.HEADER_MAGIC + struct.pack("!dd", self.alpha, self.threshold)

    encoded = header + error_data
    compressed_bits = len(encoded) * 8
    ratio = original_bits / max(compressed_bits, 1)

    return encoded, PredictiveCompressionResult(
        original_bits=original_bits,
        compressed_bits=compressed_bits,
        compression_ratio=ratio,
        n_spikes=int(np.sum(spikes)),
        n_neurons=N,
        n_timesteps=T,
        lossless=self.base_codec.mode == "lossless",
        prediction_accuracy=correct_predictions / max(T * N, 1),
        error_sparsity=1.0 - (int(np.sum(errors)) / max(T * N, 1)),
        predictor_type=self.predictor,
    )

decompress(data, T, N)

Decompress to spike raster.

Runs identical predictor on decoder side. XOR(error, predicted) recovers original spikes. Predictor type auto-detected from header.

Parameters

data : bytes Compressed data from compress(). T, N : int Original dimensions.

Returns

ndarray of shape (T, N), int8

Source code in src/sc_neurocore/spike_codec/predictive_codec.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
def decompress(self, data: bytes, T: int, N: int) -> np.ndarray:
    """Decompress to spike raster.

    Runs identical predictor on decoder side. XOR(error, predicted)
    recovers original spikes. Predictor type auto-detected from header.

    Parameters
    ----------
    data : bytes
        Compressed data from compress().
    T, N : int
        Original dimensions.

    Returns
    -------
    ndarray of shape (T, N), int8
    """
    import struct

    magic = data[:4]

    if magic == self.HEADER_MAGIC_WM:
        history_len = data[4]
        alpha, seed = struct.unpack("!dH", data[5:15])
        error_data = data[15:]
        errors = self.base_codec.decompress(error_data, T, N)
        return xor_and_recover_world_model(
            errors,
            N,
            history_len=history_len,
            lr=alpha,
            seed=seed,
        )

    if magic == self.HEADER_MAGIC_CTX:
        context_bits = data[4]
        error_data = data[5:]
        errors = self.base_codec.decompress(error_data, T, N)
        return _xor_and_recover_context(errors, N, context_bits)

    if magic == self.HEADER_MAGIC_LFSR:
        alpha_q8, seed = struct.unpack("!HH", data[4:8])
        error_data = data[8:]
        errors = self.base_codec.decompress(error_data, T, N)
        if _HAS_RUST:  # pragma: no cover
            flat = np.ascontiguousarray(errors).ravel()
            rec = np.asarray(_rust_recover_lfsr(flat, N, alpha_q8, seed))
            return rec.reshape(T, N)
        return _xor_and_recover_lfsr(errors, N, alpha_q8, seed)

    if magic == self.HEADER_MAGIC:
        alpha, threshold = struct.unpack("!dd", data[4:20])
        error_data = data[20:]
        errors = self.base_codec.decompress(error_data, T, N)
        if _HAS_RUST:  # pragma: no cover
            flat = np.ascontiguousarray(errors).ravel()
            rec = np.asarray(_rust_recover_ema(flat, N, alpha, threshold))
            return rec.reshape(T, N)
        return _xor_and_recover(errors, N, alpha, threshold)

    raise ValueError(
        f"Invalid header magic: {magic!r}, expected {self.HEADER_MAGIC!r} or {self.HEADER_MAGIC_LFSR!r}"
    )

PredictiveCompressionResult dataclass

Bases: CompressionResult

Compression result with predictive coding metrics.

Source code in src/sc_neurocore/spike_codec/predictive_codec.py
81
82
83
84
85
86
87
@dataclass
class PredictiveCompressionResult(CompressionResult):
    """Compression result with predictive coding metrics."""

    prediction_accuracy: float = 0.0
    error_sparsity: float = 0.0
    predictor_type: str = "ema"

Delta Codec (Neural Probes)

Inter-channel XOR residuals. Groups channels spatially, picks reference per group, encodes others as delta. Best for correlated probe arrays.

sc_neurocore.spike_codec.delta_codec

Delta spike compression: exploit spatial correlation between channels.

Architecture
  1. Group channels by spatial proximity (configurable group_size)
  2. Within each group, pick reference channel (highest spike count)
  3. XOR all other channels against the reference
  4. ISI-compress: reference channels raw, delta channels as XOR residuals
  5. Header stores group layout for decoder

Target: neural probes (Neuropixels 384ch, Utah 96-128ch) where nearby electrodes record overlapping populations. Spatial correlation makes inter-channel XOR sparser than individual channels.

Also effective for any recording with population synchrony (bursts, oscillations, up/down states).

DeltaSpikeCodec

Delta spike codec: compress inter-channel XOR residuals.

Channels are grouped spatially. Within each group, one reference channel is transmitted raw; others are XOR'd against the reference and ISI-compressed. When channels are correlated, the XOR residuals are much sparser than the raw data.

Parameters

group_size : int Channels per group. Larger groups = more sharing but weaker correlation with distant channels. 4-16 typical for probes. mode : str 'lossless' or 'lossy' for the underlying ISI codec. timing_precision : int For lossy mode: quantize timing resolution.

Source code in src/sc_neurocore/spike_codec/delta_codec.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class DeltaSpikeCodec:
    """Delta spike codec: compress inter-channel XOR residuals.

    Channels are grouped spatially. Within each group, one reference
    channel is transmitted raw; others are XOR'd against the reference
    and ISI-compressed. When channels are correlated, the XOR residuals
    are much sparser than the raw data.

    Parameters
    ----------
    group_size : int
        Channels per group. Larger groups = more sharing but weaker
        correlation with distant channels. 4-16 typical for probes.
    mode : str
        'lossless' or 'lossy' for the underlying ISI codec.
    timing_precision : int
        For lossy mode: quantize timing resolution.
    """

    HEADER_MAGIC = b"DSCX"  # Delta Spike Codec XOR

    def __init__(
        self,
        group_size: int = 8,
        mode: str = "lossless",
        timing_precision: int = 1,
    ):
        self.group_size = group_size
        self.base_codec = SpikeCodec(mode=mode, timing_precision=timing_precision)

    def compress(self, spikes: np.ndarray) -> tuple[bytes, DeltaCompressionResult]:
        """Compress spike raster using inter-channel delta coding.

        Parameters
        ----------
        spikes : ndarray of shape (T, N), binary (int8 or bool)

        Returns
        -------
        (compressed_bytes, DeltaCompressionResult)
        """
        spikes = np.asarray(spikes, dtype=np.int8)
        T, N = spikes.shape
        original_bits = T * N

        n_groups = (N + self.group_size - 1) // self.group_size

        # Build delta matrix: replace non-reference channels with XOR residuals
        delta_matrix = np.empty_like(spikes)
        ref_indices = np.empty(n_groups, dtype=np.int32)
        delta_spike_counts = []

        for g in range(n_groups):
            start = g * self.group_size
            end = min(start + self.group_size, N)
            group = spikes[:, start:end]

            # Reference = channel with most spikes (best predictor for group)
            spike_counts = group.sum(axis=0)
            ref_local = int(np.argmax(spike_counts))
            ref_indices[g] = ref_local

            ref_channel = group[:, ref_local]
            for c in range(end - start):
                if c == ref_local:
                    delta_matrix[:, start + c] = group[:, c]
                else:
                    delta = group[:, c] ^ ref_channel
                    delta_matrix[:, start + c] = delta
                    delta_spike_counts.append(int(delta.sum()))

        # ISI-compress the delta matrix
        delta_data, _ = self.base_codec.compress(delta_matrix)

        # Header: magic(4) + group_size(2) + n_groups(2) + ref_indices(n_groups bytes)
        header = self.HEADER_MAGIC
        header += struct.pack("!HH", self.group_size, n_groups)
        header += ref_indices.astype(np.uint8).tobytes()
        encoded = header + delta_data

        compressed_bits = len(encoded) * 8
        ratio = original_bits / max(compressed_bits, 1)
        n_spikes = int(np.sum(spikes))

        mean_delta_sparsity = 0.0
        if delta_spike_counts:
            raw_per_channel = n_spikes / max(N, 1)
            mean_delta = np.mean(delta_spike_counts)
            mean_delta_sparsity = 1.0 - (mean_delta / max(T, 1))

        return encoded, DeltaCompressionResult(
            original_bits=original_bits,
            compressed_bits=compressed_bits,
            compression_ratio=ratio,
            n_spikes=n_spikes,
            n_neurons=N,
            n_timesteps=T,
            lossless=self.base_codec.mode == "lossless",
            n_groups=n_groups,
            group_size=self.group_size,
            mean_delta_sparsity=mean_delta_sparsity,
            codec_type="delta",
        )

    def decompress(self, data: bytes, T: int, N: int) -> np.ndarray:
        """Decompress delta-coded spike raster.

        Parameters
        ----------
        data : bytes
        T, N : int
            Original dimensions.

        Returns
        -------
        ndarray of shape (T, N), int8
        """
        magic = data[:4]
        if magic != self.HEADER_MAGIC:
            raise ValueError(f"Invalid header magic: {magic!r}, expected {self.HEADER_MAGIC!r}")

        group_size, n_groups = struct.unpack("!HH", data[4:8])
        ref_indices = np.frombuffer(data[8 : 8 + n_groups], dtype=np.uint8).astype(np.int32)
        delta_data = data[8 + n_groups :]

        delta_matrix = self.base_codec.decompress(delta_data, T, N)

        spikes = np.empty_like(delta_matrix)
        for g in range(n_groups):
            start = g * group_size
            end = min(start + group_size, N)
            ref_local = int(ref_indices[g])

            ref_channel = delta_matrix[:, start + ref_local]
            for c in range(end - start):
                if c == ref_local:
                    spikes[:, start + c] = delta_matrix[:, start + c]
                else:
                    spikes[:, start + c] = delta_matrix[:, start + c] ^ ref_channel

        return spikes

compress(spikes)

Compress spike raster using inter-channel delta coding.

Parameters

spikes : ndarray of shape (T, N), binary (int8 or bool)

Returns

(compressed_bytes, DeltaCompressionResult)

Source code in src/sc_neurocore/spike_codec/delta_codec.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def compress(self, spikes: np.ndarray) -> tuple[bytes, DeltaCompressionResult]:
    """Compress spike raster using inter-channel delta coding.

    Parameters
    ----------
    spikes : ndarray of shape (T, N), binary (int8 or bool)

    Returns
    -------
    (compressed_bytes, DeltaCompressionResult)
    """
    spikes = np.asarray(spikes, dtype=np.int8)
    T, N = spikes.shape
    original_bits = T * N

    n_groups = (N + self.group_size - 1) // self.group_size

    # Build delta matrix: replace non-reference channels with XOR residuals
    delta_matrix = np.empty_like(spikes)
    ref_indices = np.empty(n_groups, dtype=np.int32)
    delta_spike_counts = []

    for g in range(n_groups):
        start = g * self.group_size
        end = min(start + self.group_size, N)
        group = spikes[:, start:end]

        # Reference = channel with most spikes (best predictor for group)
        spike_counts = group.sum(axis=0)
        ref_local = int(np.argmax(spike_counts))
        ref_indices[g] = ref_local

        ref_channel = group[:, ref_local]
        for c in range(end - start):
            if c == ref_local:
                delta_matrix[:, start + c] = group[:, c]
            else:
                delta = group[:, c] ^ ref_channel
                delta_matrix[:, start + c] = delta
                delta_spike_counts.append(int(delta.sum()))

    # ISI-compress the delta matrix
    delta_data, _ = self.base_codec.compress(delta_matrix)

    # Header: magic(4) + group_size(2) + n_groups(2) + ref_indices(n_groups bytes)
    header = self.HEADER_MAGIC
    header += struct.pack("!HH", self.group_size, n_groups)
    header += ref_indices.astype(np.uint8).tobytes()
    encoded = header + delta_data

    compressed_bits = len(encoded) * 8
    ratio = original_bits / max(compressed_bits, 1)
    n_spikes = int(np.sum(spikes))

    mean_delta_sparsity = 0.0
    if delta_spike_counts:
        raw_per_channel = n_spikes / max(N, 1)
        mean_delta = np.mean(delta_spike_counts)
        mean_delta_sparsity = 1.0 - (mean_delta / max(T, 1))

    return encoded, DeltaCompressionResult(
        original_bits=original_bits,
        compressed_bits=compressed_bits,
        compression_ratio=ratio,
        n_spikes=n_spikes,
        n_neurons=N,
        n_timesteps=T,
        lossless=self.base_codec.mode == "lossless",
        n_groups=n_groups,
        group_size=self.group_size,
        mean_delta_sparsity=mean_delta_sparsity,
        codec_type="delta",
    )

decompress(data, T, N)

Decompress delta-coded spike raster.

Parameters

data : bytes T, N : int Original dimensions.

Returns

ndarray of shape (T, N), int8

Source code in src/sc_neurocore/spike_codec/delta_codec.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def decompress(self, data: bytes, T: int, N: int) -> np.ndarray:
    """Decompress delta-coded spike raster.

    Parameters
    ----------
    data : bytes
    T, N : int
        Original dimensions.

    Returns
    -------
    ndarray of shape (T, N), int8
    """
    magic = data[:4]
    if magic != self.HEADER_MAGIC:
        raise ValueError(f"Invalid header magic: {magic!r}, expected {self.HEADER_MAGIC!r}")

    group_size, n_groups = struct.unpack("!HH", data[4:8])
    ref_indices = np.frombuffer(data[8 : 8 + n_groups], dtype=np.uint8).astype(np.int32)
    delta_data = data[8 + n_groups :]

    delta_matrix = self.base_codec.decompress(delta_data, T, N)

    spikes = np.empty_like(delta_matrix)
    for g in range(n_groups):
        start = g * group_size
        end = min(start + group_size, N)
        ref_local = int(ref_indices[g])

        ref_channel = delta_matrix[:, start + ref_local]
        for c in range(end - start):
            if c == ref_local:
                spikes[:, start + c] = delta_matrix[:, start + c]
            else:
                spikes[:, start + c] = delta_matrix[:, start + c] ^ ref_channel

    return spikes

DeltaCompressionResult dataclass

Bases: CompressionResult

Compression result with delta coding metrics.

Source code in src/sc_neurocore/spike_codec/delta_codec.py
35
36
37
38
39
40
41
42
@dataclass
class DeltaCompressionResult(CompressionResult):
    """Compression result with delta coding metrics."""

    n_groups: int = 0
    group_size: int = 0
    mean_delta_sparsity: float = 0.0
    codec_type: str = "delta"

Streaming Codec (Real-Time)

Fixed-latency, independently decodable frames. Each time window is a self-contained frame with bounded worst-case latency.

sc_neurocore.spike_codec.streaming_codec

Streaming spike compression with bounded latency per window.

Architecture
  1. Divide spike raster into fixed-size time windows (e.g. 20 samples = 1ms at 20kHz)
  2. Each window compressed independently as a frame
  3. Frames are self-contained: no dependency on past frames
  4. Bounded worst-case latency: window_size / sample_rate
  5. Frame format: frame_header + per-channel spike bitmask

Within each window, channels are packed as bitmasks (one bit per timestep). For window_size=20, each channel needs 20 bits = 3 bytes. Silent channels are run-length encoded (skip count). Active channels store raw bitmask.

Target: real-time BCI decoding where latency matters more than compression ratio. Also suitable for online spike sorting and closed-loop experiments.

StreamingSpikeCodec

Streaming spike codec: fixed-latency, independently decodable frames.

Each time window is compressed as a self-contained frame. No inter-frame dependencies. Worst-case latency = window_size samples.

Parameters

window_size : int Samples per frame. 20 = 1ms at 20kHz (typical BCI). Smaller = lower latency but less compression.

Source code in src/sc_neurocore/spike_codec/streaming_codec.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class StreamingSpikeCodec:
    """Streaming spike codec: fixed-latency, independently decodable frames.

    Each time window is compressed as a self-contained frame. No inter-frame
    dependencies. Worst-case latency = window_size samples.

    Parameters
    ----------
    window_size : int
        Samples per frame. 20 = 1ms at 20kHz (typical BCI).
        Smaller = lower latency but less compression.
    """

    HEADER_MAGIC = b"SSCF"  # Streaming Spike Codec Frames

    def __init__(self, window_size: int = 20):
        self.window_size = window_size

    def compress(self, spikes: np.ndarray) -> tuple[bytes, StreamingCompressionResult]:
        """Compress spike raster into independently decodable frames.

        Parameters
        ----------
        spikes : ndarray of shape (T, N), binary

        Returns
        -------
        (compressed_bytes, StreamingCompressionResult)
        """
        spikes = np.asarray(spikes, dtype=np.int8)
        T, N = spikes.shape
        original_bits = T * N

        n_frames = (T + self.window_size - 1) // self.window_size
        frames = []
        active_counts = []
        max_frame_size = 0

        for i in range(n_frames):
            start = i * self.window_size
            end = min(start + self.window_size, T)
            window = spikes[start:end]

            # Pad last window if needed
            if window.shape[0] < self.window_size:
                pad = np.zeros((self.window_size - window.shape[0], N), dtype=np.int8)
                window = np.vstack([window, pad])

            frame = _pack_window(window)
            frames.append(frame)

            active = int(np.any(window, axis=0).sum())
            active_counts.append(active)
            if len(frame) > max_frame_size:
                max_frame_size = len(frame)

        # Global header: magic(4) + window_size(2) + T(4) + N(2) + n_frames(4)
        header = self.HEADER_MAGIC + struct.pack("!HIHI", self.window_size, T, N, n_frames)
        encoded = header + b"".join(frames)

        compressed_bits = len(encoded) * 8
        ratio = original_bits / max(compressed_bits, 1)

        return encoded, StreamingCompressionResult(
            original_bits=original_bits,
            compressed_bits=compressed_bits,
            compression_ratio=ratio,
            n_spikes=int(np.sum(spikes)),
            n_neurons=N,
            n_timesteps=T,
            lossless=True,
            window_size=self.window_size,
            n_frames=n_frames,
            mean_active_channels=float(np.mean(active_counts)) if active_counts else 0.0,
            max_frame_bytes=max_frame_size,
            codec_type="streaming",
        )

    def decompress(self, data: bytes, T: int = 0, N: int = 0) -> np.ndarray:
        """Decompress streaming frames to spike raster.

        T and N are read from the header if not provided (or if 0).

        Parameters
        ----------
        data : bytes
        T, N : int (optional, read from header)

        Returns
        -------
        ndarray of shape (T, N), int8
        """
        magic = data[:4]
        if magic != self.HEADER_MAGIC:
            raise ValueError(f"Invalid header magic: {magic!r}, expected {self.HEADER_MAGIC!r}")

        window_size, T_stored, N_stored, n_frames = struct.unpack("!HIHI", data[4:16])
        if T == 0:
            T = T_stored
        if N == 0:
            N = N_stored

        offset = 16
        windows = []
        for _ in range(n_frames):
            window, offset = _unpack_window(data, offset)
            windows.append(window)

        if not windows:  # pragma: no cover — T=0 edge case
            return np.zeros((T, N), dtype=np.int8)

        full = np.vstack(windows)
        return full[:T]

    def compress_frame(self, window: np.ndarray) -> bytes:
        """Compress a single time window (for real-time streaming).

        Parameters
        ----------
        window : ndarray of shape (W, N), binary

        Returns
        -------
        bytes — single frame, independently decodable
        """
        return _pack_window(np.asarray(window, dtype=np.int8))

    def decompress_frame(self, frame: bytes) -> np.ndarray:
        """Decompress a single frame.

        Parameters
        ----------
        frame : bytes

        Returns
        -------
        ndarray of shape (W, N), int8
        """
        window, _ = _unpack_window(frame, 0)
        return window

compress(spikes)

Compress spike raster into independently decodable frames.

Parameters

spikes : ndarray of shape (T, N), binary

Returns

(compressed_bytes, StreamingCompressionResult)

Source code in src/sc_neurocore/spike_codec/streaming_codec.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def compress(self, spikes: np.ndarray) -> tuple[bytes, StreamingCompressionResult]:
    """Compress spike raster into independently decodable frames.

    Parameters
    ----------
    spikes : ndarray of shape (T, N), binary

    Returns
    -------
    (compressed_bytes, StreamingCompressionResult)
    """
    spikes = np.asarray(spikes, dtype=np.int8)
    T, N = spikes.shape
    original_bits = T * N

    n_frames = (T + self.window_size - 1) // self.window_size
    frames = []
    active_counts = []
    max_frame_size = 0

    for i in range(n_frames):
        start = i * self.window_size
        end = min(start + self.window_size, T)
        window = spikes[start:end]

        # Pad last window if needed
        if window.shape[0] < self.window_size:
            pad = np.zeros((self.window_size - window.shape[0], N), dtype=np.int8)
            window = np.vstack([window, pad])

        frame = _pack_window(window)
        frames.append(frame)

        active = int(np.any(window, axis=0).sum())
        active_counts.append(active)
        if len(frame) > max_frame_size:
            max_frame_size = len(frame)

    # Global header: magic(4) + window_size(2) + T(4) + N(2) + n_frames(4)
    header = self.HEADER_MAGIC + struct.pack("!HIHI", self.window_size, T, N, n_frames)
    encoded = header + b"".join(frames)

    compressed_bits = len(encoded) * 8
    ratio = original_bits / max(compressed_bits, 1)

    return encoded, StreamingCompressionResult(
        original_bits=original_bits,
        compressed_bits=compressed_bits,
        compression_ratio=ratio,
        n_spikes=int(np.sum(spikes)),
        n_neurons=N,
        n_timesteps=T,
        lossless=True,
        window_size=self.window_size,
        n_frames=n_frames,
        mean_active_channels=float(np.mean(active_counts)) if active_counts else 0.0,
        max_frame_bytes=max_frame_size,
        codec_type="streaming",
    )

decompress(data, T=0, N=0)

Decompress streaming frames to spike raster.

T and N are read from the header if not provided (or if 0).

Parameters

data : bytes T, N : int (optional, read from header)

Returns

ndarray of shape (T, N), int8

Source code in src/sc_neurocore/spike_codec/streaming_codec.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def decompress(self, data: bytes, T: int = 0, N: int = 0) -> np.ndarray:
    """Decompress streaming frames to spike raster.

    T and N are read from the header if not provided (or if 0).

    Parameters
    ----------
    data : bytes
    T, N : int (optional, read from header)

    Returns
    -------
    ndarray of shape (T, N), int8
    """
    magic = data[:4]
    if magic != self.HEADER_MAGIC:
        raise ValueError(f"Invalid header magic: {magic!r}, expected {self.HEADER_MAGIC!r}")

    window_size, T_stored, N_stored, n_frames = struct.unpack("!HIHI", data[4:16])
    if T == 0:
        T = T_stored
    if N == 0:
        N = N_stored

    offset = 16
    windows = []
    for _ in range(n_frames):
        window, offset = _unpack_window(data, offset)
        windows.append(window)

    if not windows:  # pragma: no cover — T=0 edge case
        return np.zeros((T, N), dtype=np.int8)

    full = np.vstack(windows)
    return full[:T]

compress_frame(window)

Compress a single time window (for real-time streaming).

Parameters

window : ndarray of shape (W, N), binary

Returns

bytes — single frame, independently decodable

Source code in src/sc_neurocore/spike_codec/streaming_codec.py
224
225
226
227
228
229
230
231
232
233
234
235
def compress_frame(self, window: np.ndarray) -> bytes:
    """Compress a single time window (for real-time streaming).

    Parameters
    ----------
    window : ndarray of shape (W, N), binary

    Returns
    -------
    bytes — single frame, independently decodable
    """
    return _pack_window(np.asarray(window, dtype=np.int8))

decompress_frame(frame)

Decompress a single frame.

Parameters

frame : bytes

Returns

ndarray of shape (W, N), int8

Source code in src/sc_neurocore/spike_codec/streaming_codec.py
237
238
239
240
241
242
243
244
245
246
247
248
249
def decompress_frame(self, frame: bytes) -> np.ndarray:
    """Decompress a single frame.

    Parameters
    ----------
    frame : bytes

    Returns
    -------
    ndarray of shape (W, N), int8
    """
    window, _ = _unpack_window(frame, 0)
    return window

StreamingCompressionResult dataclass

Bases: CompressionResult

Compression result with streaming codec metrics.

Source code in src/sc_neurocore/spike_codec/streaming_codec.py
35
36
37
38
39
40
41
42
43
@dataclass
class StreamingCompressionResult(CompressionResult):
    """Compression result with streaming codec metrics."""

    window_size: int = 0
    n_frames: int = 0
    mean_active_channels: float = 0.0
    max_frame_bytes: int = 0
    codec_type: str = "streaming"

AER Codec (Neuromorphic)

Address-Event Representation: compact event stream with delta-coded timestamps. Compatible with comm.aer_udp protocol. O(n_spikes) bytes.

sc_neurocore.spike_codec.aer_codec

AER spike compression with adaptive density handling.

Architecture
  1. Measure spike density
  2. If density <= 50%: encode spike events (standard AER)
  3. If density > 50%: invert matrix, encode silence events (O(n_gaps) bytes when most channels are firing)
  4. Delta-code timestamps, variable-width neuron IDs

Compatible with the AER-over-UDP protocol in comm/aer_udp.py.

Target: neuromorphic chip-to-chip (Loihi, SpiNNaker, BrainScaleS), event cameras (DVS), and event-driven simulators.

AERSpikeCodec

AER spike codec: event-list encoding for sparse spike data.

Converts spike raster (T, N) to a compact stream of (timestamp, neuron_id) events. Delta-encodes timestamps for further compression.

Parameters

timestamp_bits : int Bits for delta-coded timestamps. 16 = max gap of 65535 samples. Larger windows between spikes use escape codes. neuron_bits : int Bits for neuron ID. Auto-sized from N if 0.

Source code in src/sc_neurocore/spike_codec/aer_codec.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class AERSpikeCodec:
    """AER spike codec: event-list encoding for sparse spike data.

    Converts spike raster (T, N) to a compact stream of (timestamp,
    neuron_id) events. Delta-encodes timestamps for further compression.

    Parameters
    ----------
    timestamp_bits : int
        Bits for delta-coded timestamps. 16 = max gap of 65535 samples.
        Larger windows between spikes use escape codes.
    neuron_bits : int
        Bits for neuron ID. Auto-sized from N if 0.
    """

    HEADER_MAGIC = b"AERX"
    HEADER_MAGIC_INV = b"AERI"  # Inverted: encoding silences, not spikes

    def __init__(self, timestamp_bits: int = 16, neuron_bits: int = 0):
        self.timestamp_bits = timestamp_bits
        self.neuron_bits = neuron_bits

    def compress(self, spikes: np.ndarray) -> tuple[bytes, AERCompressionResult]:
        """Compress spike raster to AER event stream.

        Parameters
        ----------
        spikes : ndarray of shape (T, N), binary

        Returns
        -------
        (compressed_bytes, AERCompressionResult)
        """
        spikes = np.asarray(spikes, dtype=np.int8)
        T, N = spikes.shape
        original_bits = T * N

        # Adaptive: if >50% density, invert (encode silences instead of spikes)
        n_ones = int(np.sum(spikes))
        density = n_ones / max(T * N, 1)
        inverted = density > 0.5
        encode_matrix = 1 - spikes if inverted else spikes

        # Extract events as (timestamp, neuron_id) sorted by time then neuron
        times, neurons = np.nonzero(encode_matrix)
        # Already sorted by time (row-major), then by neuron within same time
        n_events = len(times)

        neuron_bits = (
            self.neuron_bits if self.neuron_bits > 0 else max(1, int(np.ceil(np.log2(max(N, 2)))))
        )
        neuron_bytes = (neuron_bits + 7) // 8
        # Escape marker is all-1s bytes. If max valid ID (N-1) fills all
        # bits in neuron_bytes, bump size to avoid escape collision.
        while (1 << (neuron_bytes * 8)) - 1 <= (N - 1):
            neuron_bytes += 1

        # Header: magic(4) + T(4) + N(4) + n_events(4) + neuron_bytes(1) = 17 bytes
        magic = self.HEADER_MAGIC_INV if inverted else self.HEADER_MAGIC
        header = magic + struct.pack("!IIIB", T, N, n_events, neuron_bytes)

        if n_events == 0:
            encoded = header
        else:
            # Delta-encode timestamps
            parts = []
            prev_t = 0
            ts_max = (1 << self.timestamp_bits) - 1

            for i in range(n_events):
                t = int(times[i])
                nid = int(neurons[i])
                dt = t - prev_t

                # Emit escape codes for large gaps
                while dt > ts_max:
                    parts.append(struct.pack("!H", ts_max))
                    parts.append(b"\xff" * neuron_bytes)  # escape marker
                    dt -= ts_max

                parts.append(struct.pack("!H", dt))
                parts.append(nid.to_bytes(neuron_bytes, "big"))
                prev_t = t

            encoded = header + b"".join(parts)

        compressed_bits = len(encoded) * 8
        ratio = original_bits / max(compressed_bits, 1)
        bpe = len(encoded) / max(n_events, 1)

        return encoded, AERCompressionResult(
            original_bits=original_bits,
            compressed_bits=compressed_bits,
            compression_ratio=ratio,
            n_spikes=n_ones,
            n_neurons=N,
            n_timesteps=T,
            lossless=True,
            n_events=n_events,
            bytes_per_event=bpe,
            codec_type="aer",
        )

    def decompress(self, data: bytes, T: int = 0, N: int = 0) -> np.ndarray:
        """Decompress AER event stream to spike raster.

        Parameters
        ----------
        data : bytes
        T, N : int (optional, read from header if 0)

        Returns
        -------
        ndarray of shape (T, N), int8
        """
        magic = data[:4]
        if magic not in (self.HEADER_MAGIC, self.HEADER_MAGIC_INV):
            raise ValueError(
                f"Invalid header magic: {magic!r}, expected {self.HEADER_MAGIC!r} or {self.HEADER_MAGIC_INV!r}"
            )
        inverted = magic == self.HEADER_MAGIC_INV

        T_stored, N_stored, n_events, neuron_bytes = struct.unpack("!IIIB", data[4:17])
        if T == 0:
            T = T_stored
        if N == 0:
            N = N_stored
        escape_marker = b"\xff" * neuron_bytes

        decoded = np.zeros((T, N), dtype=np.int8)
        offset = 17
        current_t = 0
        events_read = 0

        while events_read < n_events and offset + 2 + neuron_bytes <= len(data):
            dt = struct.unpack("!H", data[offset : offset + 2])[0]
            nid_bytes = data[offset + 2 : offset + 2 + neuron_bytes]
            offset += 2 + neuron_bytes

            if nid_bytes == escape_marker:
                current_t += dt
                continue

            current_t += dt
            nid = int.from_bytes(nid_bytes, "big")

            if 0 <= current_t < T and 0 <= nid < N:
                decoded[current_t, nid] = 1
            events_read += 1

        if inverted:
            return 1 - decoded
        return decoded

compress(spikes)

Compress spike raster to AER event stream.

Parameters

spikes : ndarray of shape (T, N), binary

Returns

(compressed_bytes, AERCompressionResult)

Source code in src/sc_neurocore/spike_codec/aer_codec.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def compress(self, spikes: np.ndarray) -> tuple[bytes, AERCompressionResult]:
    """Compress spike raster to AER event stream.

    Parameters
    ----------
    spikes : ndarray of shape (T, N), binary

    Returns
    -------
    (compressed_bytes, AERCompressionResult)
    """
    spikes = np.asarray(spikes, dtype=np.int8)
    T, N = spikes.shape
    original_bits = T * N

    # Adaptive: if >50% density, invert (encode silences instead of spikes)
    n_ones = int(np.sum(spikes))
    density = n_ones / max(T * N, 1)
    inverted = density > 0.5
    encode_matrix = 1 - spikes if inverted else spikes

    # Extract events as (timestamp, neuron_id) sorted by time then neuron
    times, neurons = np.nonzero(encode_matrix)
    # Already sorted by time (row-major), then by neuron within same time
    n_events = len(times)

    neuron_bits = (
        self.neuron_bits if self.neuron_bits > 0 else max(1, int(np.ceil(np.log2(max(N, 2)))))
    )
    neuron_bytes = (neuron_bits + 7) // 8
    # Escape marker is all-1s bytes. If max valid ID (N-1) fills all
    # bits in neuron_bytes, bump size to avoid escape collision.
    while (1 << (neuron_bytes * 8)) - 1 <= (N - 1):
        neuron_bytes += 1

    # Header: magic(4) + T(4) + N(4) + n_events(4) + neuron_bytes(1) = 17 bytes
    magic = self.HEADER_MAGIC_INV if inverted else self.HEADER_MAGIC
    header = magic + struct.pack("!IIIB", T, N, n_events, neuron_bytes)

    if n_events == 0:
        encoded = header
    else:
        # Delta-encode timestamps
        parts = []
        prev_t = 0
        ts_max = (1 << self.timestamp_bits) - 1

        for i in range(n_events):
            t = int(times[i])
            nid = int(neurons[i])
            dt = t - prev_t

            # Emit escape codes for large gaps
            while dt > ts_max:
                parts.append(struct.pack("!H", ts_max))
                parts.append(b"\xff" * neuron_bytes)  # escape marker
                dt -= ts_max

            parts.append(struct.pack("!H", dt))
            parts.append(nid.to_bytes(neuron_bytes, "big"))
            prev_t = t

        encoded = header + b"".join(parts)

    compressed_bits = len(encoded) * 8
    ratio = original_bits / max(compressed_bits, 1)
    bpe = len(encoded) / max(n_events, 1)

    return encoded, AERCompressionResult(
        original_bits=original_bits,
        compressed_bits=compressed_bits,
        compression_ratio=ratio,
        n_spikes=n_ones,
        n_neurons=N,
        n_timesteps=T,
        lossless=True,
        n_events=n_events,
        bytes_per_event=bpe,
        codec_type="aer",
    )

decompress(data, T=0, N=0)

Decompress AER event stream to spike raster.

Parameters

data : bytes T, N : int (optional, read from header if 0)

Returns

ndarray of shape (T, N), int8

Source code in src/sc_neurocore/spike_codec/aer_codec.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def decompress(self, data: bytes, T: int = 0, N: int = 0) -> np.ndarray:
    """Decompress AER event stream to spike raster.

    Parameters
    ----------
    data : bytes
    T, N : int (optional, read from header if 0)

    Returns
    -------
    ndarray of shape (T, N), int8
    """
    magic = data[:4]
    if magic not in (self.HEADER_MAGIC, self.HEADER_MAGIC_INV):
        raise ValueError(
            f"Invalid header magic: {magic!r}, expected {self.HEADER_MAGIC!r} or {self.HEADER_MAGIC_INV!r}"
        )
    inverted = magic == self.HEADER_MAGIC_INV

    T_stored, N_stored, n_events, neuron_bytes = struct.unpack("!IIIB", data[4:17])
    if T == 0:
        T = T_stored
    if N == 0:
        N = N_stored
    escape_marker = b"\xff" * neuron_bytes

    decoded = np.zeros((T, N), dtype=np.int8)
    offset = 17
    current_t = 0
    events_read = 0

    while events_read < n_events and offset + 2 + neuron_bytes <= len(data):
        dt = struct.unpack("!H", data[offset : offset + 2])[0]
        nid_bytes = data[offset + 2 : offset + 2 + neuron_bytes]
        offset += 2 + neuron_bytes

        if nid_bytes == escape_marker:
            current_t += dt
            continue

        current_t += dt
        nid = int.from_bytes(nid_bytes, "big")

        if 0 <= current_t < T and 0 <= nid < N:
            decoded[current_t, nid] = 1
        events_read += 1

    if inverted:
        return 1 - decoded
    return decoded

AERCompressionResult dataclass

Bases: CompressionResult

Compression result with AER codec metrics.

Source code in src/sc_neurocore/spike_codec/aer_codec.py
33
34
35
36
37
38
39
@dataclass
class AERCompressionResult(CompressionResult):
    """Compression result with AER codec metrics."""

    n_events: int = 0
    bytes_per_event: float = 0.0
    codec_type: str = "aer"

Waveform Codec (Raw Electrode)

End-to-end raw waveform compression: spike detection + template matching + background LFP compression. 24x on 1024-channel Neuralink-scale data. Spike timing is lossless. Fits in Bluetooth uplink.

sc_neurocore.spike_codec.waveform_codec

End-to-end neural waveform compression.

Full pipeline from raw 10-bit ADC samples to compressed bytes
  1. Threshold-crossing spike detection
  2. Spike timing → binary raster → PredictiveSpikeCodec (existing)
  3. Spike waveform snippets → template library + residuals
  4. Background signal → delta encoding + quantization

This is what Neuralink actually needs: compress raw electrode data, not pre-sorted binary rasters. The combined pipeline targets >50x on raw 10-bit waveforms while preserving both spike timing and waveform shape for downstream decoding.

Operates on (T, N) int16 arrays (10-bit ADC values in int16 container).

WaveformCodec

End-to-end neural waveform codec.

Pipeline: detect → separate → compress each component optimally.

Parameters

threshold_sigma : float Spike detection threshold in units of per-channel noise sigma. Typical: 4.0-5.0 (4 sigma catches ~99.99% of noise). snippet_samples : int Waveform samples to extract around each spike (before + after peak). max_templates : int Maximum number of spike waveform templates to maintain. template_threshold : float Correlation threshold for template matching (0-1). quantize_bits : int Background signal quantization (fewer bits = more compression).

Source code in src/sc_neurocore/spike_codec/waveform_codec.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class WaveformCodec:
    """End-to-end neural waveform codec.

    Pipeline: detect → separate → compress each component optimally.

    Parameters
    ----------
    threshold_sigma : float
        Spike detection threshold in units of per-channel noise sigma.
        Typical: 4.0-5.0 (4 sigma catches ~99.99% of noise).
    snippet_samples : int
        Waveform samples to extract around each spike (before + after peak).
    max_templates : int
        Maximum number of spike waveform templates to maintain.
    template_threshold : float
        Correlation threshold for template matching (0-1).
    quantize_bits : int
        Background signal quantization (fewer bits = more compression).
    """

    HEADER_MAGIC = b"WFCX"

    def __init__(
        self,
        threshold_sigma: float = 4.5,
        snippet_samples: int = 48,
        max_templates: int = 16,
        template_threshold: float = 0.9,
        quantize_bits: int = 6,
    ):
        self.threshold_sigma = threshold_sigma
        self.snippet_samples = snippet_samples
        self.max_templates = max_templates
        self.template_threshold = template_threshold
        self.quantize_bits = quantize_bits
        self.spike_codec = SpikeCodec(entropy="auto")

    def compress(self, waveform: np.ndarray) -> tuple[bytes, WaveformCompressionResult]:
        """Compress raw electrode waveform.

        Parameters
        ----------
        waveform : ndarray of shape (T, N), int16 or float
            Raw ADC samples. T = timesteps, N = channels.

        Returns
        -------
        (compressed_bytes, WaveformCompressionResult)
        """
        waveform = np.asarray(waveform, dtype=np.float32)
        T, N = waveform.shape
        original_bytes = T * N * 2  # 16-bit raw

        # Step 1: Per-channel noise estimation (MAD estimator)
        noise_sigma = np.median(np.abs(waveform), axis=0) / 0.6745
        noise_sigma = np.maximum(noise_sigma, 1e-6)

        # Step 2: Threshold-crossing spike detection
        thresholds = -self.threshold_sigma * noise_sigma  # negative threshold
        spike_raster, spike_times_per_ch = self._detect_spikes(waveform, thresholds)

        # Step 3: Extract spike snippets
        snippets, snippet_indices = self._extract_snippets(waveform, spike_times_per_ch, N)

        # Step 4: Template matching on snippets
        templates, template_ids, residuals = self._template_match(snippets)

        # Step 5: Compress spike timing (binary raster → ISI)
        spike_data, _ = self.spike_codec.compress(spike_raster)

        # Step 6: Compress templates + template IDs + residuals
        snippet_data = self._compress_snippets(templates, template_ids, residuals)

        # Step 7: Compress background (waveform minus spikes)
        background = self._extract_background(waveform, spike_times_per_ch)
        bg_data = self._compress_background(background)

        # Pack everything
        header = self.HEADER_MAGIC + struct.pack(
            "!IIHHBBB",
            T,
            N,
            len(templates),
            len(snippet_indices),
            self.snippet_samples,
            self.quantize_bits,
            len(spike_data).bit_length(),
        )
        # Length-prefixed sections
        parts = [
            header,
            struct.pack("!I", len(spike_data)),
            spike_data,
            struct.pack("!I", len(snippet_data)),
            snippet_data,
            struct.pack("!I", len(bg_data)),
            bg_data,
        ]
        encoded = b"".join(parts)

        n_spikes = int(spike_raster.sum())

        return encoded, WaveformCompressionResult(
            original_bytes=original_bytes,
            compressed_bytes=len(encoded),
            compression_ratio=original_bytes / max(len(encoded), 1),
            n_channels=N,
            n_samples=T,
            n_spikes_detected=n_spikes,
            n_templates=len(templates),
            spike_bytes=len(spike_data),
            snippet_bytes=len(snippet_data),
            background_bytes=len(bg_data),
            lossless_spikes=True,
        )

    def _detect_spikes(self, waveform, thresholds):
        """Threshold-crossing spike detection with refractory period."""
        T, N = waveform.shape
        raster = np.zeros((T, N), dtype=np.int8)
        times_per_ch: list[list[int]] = [[] for _ in range(N)]
        refractory = self.snippet_samples // 2

        for ch in range(N):
            last_spike = -refractory - 1
            for t in range(1, T):
                if (
                    waveform[t, ch] < thresholds[ch]
                    and waveform[t, ch] < waveform[t - 1, ch]
                    and (t - last_spike) > refractory
                ):
                    raster[t, ch] = 1
                    times_per_ch[ch].append(t)
                    last_spike = t

        return raster, times_per_ch

    def _extract_snippets(self, waveform, times_per_ch, N):
        """Extract waveform clips around detected spikes."""
        T = waveform.shape[0]
        half = self.snippet_samples // 2
        snippets = []
        indices = []

        for ch in range(N):
            for t in times_per_ch[ch]:
                start = max(0, t - half)
                end = min(T, t + half)
                clip = waveform[start:end, ch]
                if len(clip) < self.snippet_samples:
                    clip = np.pad(clip, (0, self.snippet_samples - len(clip)))
                else:
                    clip = clip[: self.snippet_samples]
                snippets.append(clip.astype(np.float32))
                indices.append((ch, t))

        return snippets, indices

    def _template_match(self, snippets):
        """Build template library and match snippets."""
        if not snippets:
            return [], [], []

        templates = [snippets[0].copy()]
        template_ids = [0]
        residuals = [np.zeros_like(snippets[0])]

        for i in range(1, len(snippets)):
            s = snippets[i]
            best_corr = -1.0
            best_idx = -1

            for j, tmpl in enumerate(templates):
                norm_s = np.linalg.norm(s)
                norm_t = np.linalg.norm(tmpl)
                if norm_s > 1e-6 and norm_t > 1e-6:
                    corr = float(np.dot(s, tmpl) / (norm_s * norm_t))
                    if corr > best_corr:
                        best_corr = corr
                        best_idx = j

            if best_corr >= self.template_threshold:
                template_ids.append(best_idx)
                scale = np.dot(s, templates[best_idx]) / max(
                    np.dot(templates[best_idx], templates[best_idx]), 1e-10
                )
                residuals.append(s - scale * templates[best_idx])
            elif len(templates) < self.max_templates:
                templates.append(s.copy())
                template_ids.append(len(templates) - 1)
                residuals.append(np.zeros_like(s))
            else:
                template_ids.append(best_idx)
                residuals.append(s - templates[best_idx])

        return templates, template_ids, residuals

    def _compress_snippets(self, templates, template_ids, residuals):
        """Compress templates + IDs + quantized residuals."""
        parts = []

        # Templates: n_templates(2) + [float32 array] each
        parts.append(struct.pack("!H", len(templates)))
        for tmpl in templates:
            parts.append(tmpl.astype(np.float32).tobytes())

        # Template IDs: varint per spike
        parts.append(struct.pack("!I", len(template_ids)))
        for tid in template_ids:
            parts.append(SpikeCodec._encode_varint(tid))

        # Residuals: quantize to int8 then store
        if residuals:
            all_res = np.array(residuals)
            res_max = max(np.abs(all_res).max(), 1e-6)
            quantized = np.clip(all_res / res_max * 127, -127, 127).astype(np.int8)
            parts.append(struct.pack("!f", float(res_max)))
            parts.append(quantized.tobytes())
        else:
            parts.append(struct.pack("!f", 0.0))

        return b"".join(parts)

    def _extract_background(self, waveform, times_per_ch):
        """Extract low-frequency background (remove spikes)."""
        T, N = waveform.shape
        bg = waveform.copy()
        half = self.snippet_samples // 2
        for ch in range(N):
            for t in times_per_ch[ch]:
                start = max(0, t - half)
                end = min(T, t + half)
                bg[start:end, ch] = 0  # zero out spike regions

        # Downsample by 4x (LFP doesn't need 20kHz)
        ds = 4
        if ds <= T:
            bg_ds = bg[: T - T % ds].reshape(-1, ds, N).mean(axis=1)
        else:
            bg_ds = bg
        return bg_ds

    def _compress_background(self, background):
        """Delta-encode + quantize background signal."""
        if background.size == 0:
            return b""

        # Delta encoding (temporal differences)
        delta = np.diff(background, axis=0, prepend=background[:1])

        # Quantize to quantize_bits
        dmax = max(np.abs(delta).max(), 1e-6)
        levels = 1 << self.quantize_bits
        quantized = np.clip(
            np.round(delta / dmax * (levels // 2)), -(levels // 2), levels // 2 - 1
        ).astype(np.int8)

        import zlib

        raw_bytes = quantized.tobytes()
        compressed = zlib.compress(raw_bytes, 9)
        header = struct.pack("!IIf", background.shape[0], background.shape[1], float(dmax))
        return header + compressed

compress(waveform)

Compress raw electrode waveform.

Parameters

waveform : ndarray of shape (T, N), int16 or float Raw ADC samples. T = timesteps, N = channels.

Returns

(compressed_bytes, WaveformCompressionResult)

Source code in src/sc_neurocore/spike_codec/waveform_codec.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def compress(self, waveform: np.ndarray) -> tuple[bytes, WaveformCompressionResult]:
    """Compress raw electrode waveform.

    Parameters
    ----------
    waveform : ndarray of shape (T, N), int16 or float
        Raw ADC samples. T = timesteps, N = channels.

    Returns
    -------
    (compressed_bytes, WaveformCompressionResult)
    """
    waveform = np.asarray(waveform, dtype=np.float32)
    T, N = waveform.shape
    original_bytes = T * N * 2  # 16-bit raw

    # Step 1: Per-channel noise estimation (MAD estimator)
    noise_sigma = np.median(np.abs(waveform), axis=0) / 0.6745
    noise_sigma = np.maximum(noise_sigma, 1e-6)

    # Step 2: Threshold-crossing spike detection
    thresholds = -self.threshold_sigma * noise_sigma  # negative threshold
    spike_raster, spike_times_per_ch = self._detect_spikes(waveform, thresholds)

    # Step 3: Extract spike snippets
    snippets, snippet_indices = self._extract_snippets(waveform, spike_times_per_ch, N)

    # Step 4: Template matching on snippets
    templates, template_ids, residuals = self._template_match(snippets)

    # Step 5: Compress spike timing (binary raster → ISI)
    spike_data, _ = self.spike_codec.compress(spike_raster)

    # Step 6: Compress templates + template IDs + residuals
    snippet_data = self._compress_snippets(templates, template_ids, residuals)

    # Step 7: Compress background (waveform minus spikes)
    background = self._extract_background(waveform, spike_times_per_ch)
    bg_data = self._compress_background(background)

    # Pack everything
    header = self.HEADER_MAGIC + struct.pack(
        "!IIHHBBB",
        T,
        N,
        len(templates),
        len(snippet_indices),
        self.snippet_samples,
        self.quantize_bits,
        len(spike_data).bit_length(),
    )
    # Length-prefixed sections
    parts = [
        header,
        struct.pack("!I", len(spike_data)),
        spike_data,
        struct.pack("!I", len(snippet_data)),
        snippet_data,
        struct.pack("!I", len(bg_data)),
        bg_data,
    ]
    encoded = b"".join(parts)

    n_spikes = int(spike_raster.sum())

    return encoded, WaveformCompressionResult(
        original_bytes=original_bytes,
        compressed_bytes=len(encoded),
        compression_ratio=original_bytes / max(len(encoded), 1),
        n_channels=N,
        n_samples=T,
        n_spikes_detected=n_spikes,
        n_templates=len(templates),
        spike_bytes=len(spike_data),
        snippet_bytes=len(snippet_data),
        background_bytes=len(bg_data),
        lossless_spikes=True,
    )

WaveformCompressionResult dataclass

Result of waveform compression.

Source code in src/sc_neurocore/spike_codec/waveform_codec.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@dataclass
class WaveformCompressionResult:
    """Result of waveform compression."""

    original_bytes: int
    compressed_bytes: int
    compression_ratio: float
    n_channels: int
    n_samples: int
    n_spikes_detected: int
    n_templates: int
    spike_bytes: int
    snippet_bytes: int
    background_bytes: int
    lossless_spikes: bool