Skip to content

Spike-Domain DSP

FIR/IIR filters, FFT, wavelet decomposition — all in spike domain.

Filters

sc_neurocore.spike_dsp.filters

Spike-domain filtering: process spike trains without converting to float.

FIR: weighted sum of delayed spike trains via coincidence counting. IIR: recursive filtering with leaky integrator (LIF-based).

All operations stay in spike/binary domain for hardware deployment.

SpikeFIR dataclass

Finite Impulse Response filter in spike domain.

Computes weighted sum of delayed input spike trains. Output at time t = sum(coefficients[k] * input[t-k]) > threshold.

Parameters

coefficients : ndarray FIR tap weights. threshold : float Output spike threshold (on weighted sum).

Source code in src/sc_neurocore/spike_dsp/filters.py
Python
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@dataclass
class SpikeFIR:
    """Finite Impulse Response filter in spike domain.

    Computes weighted sum of delayed input spike trains.
    Output at time t = sum(coefficients[k] * input[t-k]) > threshold.

    Parameters
    ----------
    coefficients : ndarray
        FIR tap weights.
    threshold : float
        Output spike threshold (on weighted sum).
    """

    coefficients: np.ndarray
    threshold: float = 0.5

    def filter(self, spikes: np.ndarray) -> np.ndarray:
        """Apply FIR filter to spike train.

        Parameters
        ----------
        spikes : ndarray of shape (T,) or (T, N)

        Returns
        -------
        ndarray of same shape, binary
        """
        if spikes.ndim == 1:
            spikes = spikes[:, np.newaxis]
        T, N = spikes.shape
        K = len(self.coefficients)
        output = np.zeros_like(spikes, dtype=np.int8)

        for t in range(K, T):
            weighted = np.zeros(N, dtype=np.float64)
            for k, c in enumerate(self.coefficients):
                weighted += c * spikes[t - k].astype(np.float64)
            output[t] = (weighted >= self.threshold).astype(np.int8)

        return output if output.shape[1] > 1 else output[:, 0]

filter(spikes)

Apply FIR filter to spike train.

Parameters

spikes : ndarray of shape (T,) or (T, N)

Returns

ndarray of same shape, binary

Source code in src/sc_neurocore/spike_dsp/filters.py
Python
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def filter(self, spikes: np.ndarray) -> np.ndarray:
    """Apply FIR filter to spike train.

    Parameters
    ----------
    spikes : ndarray of shape (T,) or (T, N)

    Returns
    -------
    ndarray of same shape, binary
    """
    if spikes.ndim == 1:
        spikes = spikes[:, np.newaxis]
    T, N = spikes.shape
    K = len(self.coefficients)
    output = np.zeros_like(spikes, dtype=np.int8)

    for t in range(K, T):
        weighted = np.zeros(N, dtype=np.float64)
        for k, c in enumerate(self.coefficients):
            weighted += c * spikes[t - k].astype(np.float64)
        output[t] = (weighted >= self.threshold).astype(np.int8)

    return output if output.shape[1] > 1 else output[:, 0]

SpikeIIR dataclass

Infinite Impulse Response filter using leaky integration.

Membrane-like dynamics: state = decay * state + input_spike. Fires when state exceeds threshold. Natural IIR in spike domain.

Parameters

decay : float Decay factor per timestep (0-1). threshold : float gain : float Input spike gain.

Source code in src/sc_neurocore/spike_dsp/filters.py
Python
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@dataclass
class SpikeIIR:
    """Infinite Impulse Response filter using leaky integration.

    Membrane-like dynamics: state = decay * state + input_spike.
    Fires when state exceeds threshold. Natural IIR in spike domain.

    Parameters
    ----------
    decay : float
        Decay factor per timestep (0-1).
    threshold : float
    gain : float
        Input spike gain.
    """

    decay: float = 0.9
    threshold: float = 1.0
    gain: float = 0.5

    def filter(self, spikes: np.ndarray) -> np.ndarray:
        """Apply IIR filter to spike train."""
        if spikes.ndim == 1:
            spikes = spikes[:, np.newaxis]
        T, N = spikes.shape
        state = np.zeros(N, dtype=np.float64)
        output = np.zeros_like(spikes, dtype=np.int8)

        for t in range(T):
            state = self.decay * state + self.gain * spikes[t].astype(np.float64)
            fire = state >= self.threshold
            output[t] = fire.astype(np.int8)
            state[fire] = 0.0

        return output if output.shape[1] > 1 else output[:, 0]

filter(spikes)

Apply IIR filter to spike train.

Source code in src/sc_neurocore/spike_dsp/filters.py
Python
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def filter(self, spikes: np.ndarray) -> np.ndarray:
    """Apply IIR filter to spike train."""
    if spikes.ndim == 1:
        spikes = spikes[:, np.newaxis]
    T, N = spikes.shape
    state = np.zeros(N, dtype=np.float64)
    output = np.zeros_like(spikes, dtype=np.int8)

    for t in range(T):
        state = self.decay * state + self.gain * spikes[t].astype(np.float64)
        fire = state >= self.threshold
        output[t] = fire.astype(np.int8)
        state[fire] = 0.0

    return output if output.shape[1] > 1 else output[:, 0]

spike_convolve(spikes, kernel, threshold=0.5)

Convolve a spike train with a kernel in spike domain.

Parameters

spikes : ndarray of shape (T,) kernel : ndarray of shape (K,) threshold : float

Returns

ndarray of shape (T,), binary

Source code in src/sc_neurocore/spike_dsp/filters.py
Python
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def spike_convolve(spikes: np.ndarray, kernel: np.ndarray, threshold: float = 0.5) -> np.ndarray:
    """Convolve a spike train with a kernel in spike domain.

    Parameters
    ----------
    spikes : ndarray of shape (T,)
    kernel : ndarray of shape (K,)
    threshold : float

    Returns
    -------
    ndarray of shape (T,), binary
    """
    fir = SpikeFIR(coefficients=kernel, threshold=threshold)
    return fir.filter(spikes)

Spectral

sc_neurocore.spike_dsp.spectral

Spike-domain FFT and power spectrum via time-coded spiking Fourier transform.

Converts spike train to instantaneous firing rate, computes FFT, returns frequency components as spike-rate amplitudes. The firing-rate conversion uses a sliding window (kernel-based) that stays in the integer domain.

spike_fft(spikes, dt=0.001, window_size=50)

Compute FFT of a spike train.

Parameters

spikes : ndarray of shape (T,) or (T, N) Binary spike train(s). dt : float Timestep in seconds. window_size : int Sliding window for instantaneous rate estimation.

Returns

(frequencies, magnitudes) tuple frequencies: ndarray of shape (F,) magnitudes: ndarray of shape (F,) or (F, N)

Source code in src/sc_neurocore/spike_dsp/spectral.py
Python
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def spike_fft(
    spikes: np.ndarray,
    dt: float = 0.001,
    window_size: int = 50,
) -> tuple[np.ndarray, np.ndarray]:
    """Compute FFT of a spike train.

    Parameters
    ----------
    spikes : ndarray of shape (T,) or (T, N)
        Binary spike train(s).
    dt : float
        Timestep in seconds.
    window_size : int
        Sliding window for instantaneous rate estimation.

    Returns
    -------
    (frequencies, magnitudes) tuple
        frequencies: ndarray of shape (F,)
        magnitudes: ndarray of shape (F,) or (F, N)
    """
    if spikes.ndim == 1:
        spikes = spikes[:, np.newaxis]
    T, N = spikes.shape

    # Compute instantaneous firing rate via sliding window
    rates = np.zeros((T, N), dtype=np.float64)
    for t in range(T):
        start = max(0, t - window_size + 1)
        rates[t] = spikes[start : t + 1].mean(axis=0) / dt

    # FFT
    fft_result = np.fft.rfft(rates, axis=0)
    magnitudes = np.abs(fft_result)
    frequencies = np.fft.rfftfreq(T, d=dt)

    if N == 1:
        magnitudes = magnitudes[:, 0]
    return frequencies, magnitudes

spike_power_spectrum(spikes, dt=0.001, window_size=50)

Compute power spectral density of a spike train.

Returns

(frequencies, psd) tuple

Source code in src/sc_neurocore/spike_dsp/spectral.py
Python
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def spike_power_spectrum(
    spikes: np.ndarray,
    dt: float = 0.001,
    window_size: int = 50,
) -> tuple[np.ndarray, np.ndarray]:
    """Compute power spectral density of a spike train.

    Returns
    -------
    (frequencies, psd) tuple
    """
    freqs, mags = spike_fft(spikes, dt, window_size)
    psd = mags**2
    return freqs, psd

Wavelets

sc_neurocore.spike_dsp.wavelets

Spike-domain wavelet decomposition using multi-scale spike filtering.

spike_wavelet_decompose(spikes, n_scales=4, base_window=4)

Decompose spike train into frequency bands via multi-scale filtering.

Uses cascaded moving-average filters at doubling window sizes: scale 0: window=base_window, scale 1: window=2*base_window, etc. Each scale captures a different frequency band.

Parameters

spikes : ndarray of shape (T,) or (T, N) n_scales : int Number of wavelet scales. base_window : int Window size for finest scale.

Returns

list of ndarray One array per scale, shape (T,) or (T, N). Binary spike representation of activity at each frequency band.

Source code in src/sc_neurocore/spike_dsp/wavelets.py
Python
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def spike_wavelet_decompose(
    spikes: np.ndarray,
    n_scales: int = 4,
    base_window: int = 4,
) -> list[np.ndarray]:
    """Decompose spike train into frequency bands via multi-scale filtering.

    Uses cascaded moving-average filters at doubling window sizes:
    scale 0: window=base_window, scale 1: window=2*base_window, etc.
    Each scale captures a different frequency band.

    Parameters
    ----------
    spikes : ndarray of shape (T,) or (T, N)
    n_scales : int
        Number of wavelet scales.
    base_window : int
        Window size for finest scale.

    Returns
    -------
    list of ndarray
        One array per scale, shape (T,) or (T, N). Binary spike representation
        of activity at each frequency band.
    """
    if spikes.ndim == 1:
        spikes = spikes[:, np.newaxis]
        squeeze = True
    else:
        squeeze = False

    T, N = spikes.shape
    scales = []

    for s in range(n_scales):
        window = base_window * (2**s)
        # Moving average at this scale
        smoothed = np.zeros((T, N), dtype=np.float64)
        for t in range(T):
            start = max(0, t - window + 1)
            smoothed[t] = spikes[start : t + 1].mean(axis=0)

        # Difference between adjacent scales = bandpass
        if s == 0:
            band = smoothed
        else:
            prev_window = base_window * (2 ** (s - 1))
            prev_smoothed = np.zeros((T, N), dtype=np.float64)
            for t in range(T):
                start = max(0, t - prev_window + 1)
                prev_smoothed[t] = spikes[start : t + 1].mean(axis=0)
            band = np.abs(prev_smoothed - smoothed)

        # Threshold to binary
        threshold = max(band.mean() * 0.5, 1e-8)
        binary_band = (band > threshold).astype(np.int8)

        scales.append(binary_band[:, 0] if squeeze else binary_band)

    return scales