Skip to content

Spike-Domain DSP

FIR/IIR filters, FFT, wavelet decomposition — all in spike domain.

Filters

sc_neurocore.spike_dsp.filters

Spike-domain filtering: process spike trains without converting to float.

FIR: weighted sum of delayed spike trains via coincidence counting. IIR: recursive filtering with leaky integrator (LIF-based).

All operations stay in spike/binary domain for hardware deployment.

SpikeFIR dataclass

Finite Impulse Response filter in spike domain.

Computes weighted sum of delayed input spike trains. Output at time t = sum(coefficients[k] * input[t-k]) > threshold.

Parameters

coefficients : ndarray FIR tap weights. threshold : float Output spike threshold (on weighted sum).

Source code in src/sc_neurocore/spike_dsp/filters.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class SpikeFIR:
    """Finite Impulse Response filter in spike domain.

    Computes weighted sum of delayed input spike trains.
    Output at time t = sum(coefficients[k] * input[t-k]) > threshold.

    Parameters
    ----------
    coefficients : ndarray
        FIR tap weights.
    threshold : float
        Output spike threshold (on weighted sum).
    """

    coefficients: np.ndarray
    threshold: float = 0.5

    def filter(self, spikes: np.ndarray) -> np.ndarray:
        """Apply FIR filter to spike train.

        Parameters
        ----------
        spikes : ndarray of shape (T,) or (T, N)

        Returns
        -------
        ndarray of same shape, binary
        """
        if spikes.ndim == 1:
            spikes = spikes[:, np.newaxis]
        T, N = spikes.shape
        K = len(self.coefficients)
        output = np.zeros_like(spikes, dtype=np.int8)

        for t in range(K, T):
            weighted = np.zeros(N, dtype=np.float64)
            for k, c in enumerate(self.coefficients):
                weighted += c * spikes[t - k].astype(np.float64)
            output[t] = (weighted >= self.threshold).astype(np.int8)

        return output if output.shape[1] > 1 else output[:, 0]

filter(spikes)

Apply FIR filter to spike train.

Parameters

spikes : ndarray of shape (T,) or (T, N)

Returns

ndarray of same shape, binary

Source code in src/sc_neurocore/spike_dsp/filters.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def filter(self, spikes: np.ndarray) -> np.ndarray:
    """Apply FIR filter to spike train.

    Parameters
    ----------
    spikes : ndarray of shape (T,) or (T, N)

    Returns
    -------
    ndarray of same shape, binary
    """
    if spikes.ndim == 1:
        spikes = spikes[:, np.newaxis]
    T, N = spikes.shape
    K = len(self.coefficients)
    output = np.zeros_like(spikes, dtype=np.int8)

    for t in range(K, T):
        weighted = np.zeros(N, dtype=np.float64)
        for k, c in enumerate(self.coefficients):
            weighted += c * spikes[t - k].astype(np.float64)
        output[t] = (weighted >= self.threshold).astype(np.int8)

    return output if output.shape[1] > 1 else output[:, 0]

SpikeIIR dataclass

Infinite Impulse Response filter using leaky integration.

Membrane-like dynamics: state = decay * state + input_spike. Fires when state exceeds threshold. Natural IIR in spike domain.

Parameters

decay : float Decay factor per timestep (0-1). threshold : float gain : float Input spike gain.

Source code in src/sc_neurocore/spike_dsp/filters.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@dataclass
class SpikeIIR:
    """Infinite Impulse Response filter using leaky integration.

    Membrane-like dynamics: state = decay * state + input_spike.
    Fires when state exceeds threshold. Natural IIR in spike domain.

    Parameters
    ----------
    decay : float
        Decay factor per timestep (0-1).
    threshold : float
    gain : float
        Input spike gain.
    """

    decay: float = 0.9
    threshold: float = 1.0
    gain: float = 0.5

    def filter(self, spikes: np.ndarray) -> np.ndarray:
        """Apply IIR filter to spike train."""
        if spikes.ndim == 1:
            spikes = spikes[:, np.newaxis]
        T, N = spikes.shape
        state = np.zeros(N, dtype=np.float64)
        output = np.zeros_like(spikes, dtype=np.int8)

        for t in range(T):
            state = self.decay * state + self.gain * spikes[t].astype(np.float64)
            fire = state >= self.threshold
            output[t] = fire.astype(np.int8)
            state[fire] = 0.0

        return output if output.shape[1] > 1 else output[:, 0]

filter(spikes)

Apply IIR filter to spike train.

Source code in src/sc_neurocore/spike_dsp/filters.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def filter(self, spikes: np.ndarray) -> np.ndarray:
    """Apply IIR filter to spike train."""
    if spikes.ndim == 1:
        spikes = spikes[:, np.newaxis]
    T, N = spikes.shape
    state = np.zeros(N, dtype=np.float64)
    output = np.zeros_like(spikes, dtype=np.int8)

    for t in range(T):
        state = self.decay * state + self.gain * spikes[t].astype(np.float64)
        fire = state >= self.threshold
        output[t] = fire.astype(np.int8)
        state[fire] = 0.0

    return output if output.shape[1] > 1 else output[:, 0]

spike_convolve(spikes, kernel, threshold=0.5)

Convolve a spike train with a kernel in spike domain.

Parameters

spikes : ndarray of shape (T,) kernel : ndarray of shape (K,) threshold : float

Returns

ndarray of shape (T,), binary

Source code in src/sc_neurocore/spike_dsp/filters.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def spike_convolve(spikes: np.ndarray, kernel: np.ndarray, threshold: float = 0.5) -> np.ndarray:
    """Convolve a spike train with a kernel in spike domain.

    Parameters
    ----------
    spikes : ndarray of shape (T,)
    kernel : ndarray of shape (K,)
    threshold : float

    Returns
    -------
    ndarray of shape (T,), binary
    """
    fir = SpikeFIR(coefficients=kernel, threshold=threshold)
    return fir.filter(spikes)

Spectral

sc_neurocore.spike_dsp.spectral

Spike-domain FFT and power spectrum via time-coded spiking Fourier transform.

Converts spike train to instantaneous firing rate, computes FFT, returns frequency components as spike-rate amplitudes. The firing-rate conversion uses a sliding window (kernel-based) that stays in the integer domain.

spike_fft(spikes, dt=0.001, window_size=50)

Compute FFT of a spike train.

Parameters

spikes : ndarray of shape (T,) or (T, N) Binary spike train(s). dt : float Timestep in seconds. window_size : int Sliding window for instantaneous rate estimation.

Returns

(frequencies, magnitudes) tuple frequencies: ndarray of shape (F,) magnitudes: ndarray of shape (F,) or (F, N)

Source code in src/sc_neurocore/spike_dsp/spectral.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def spike_fft(
    spikes: np.ndarray,
    dt: float = 0.001,
    window_size: int = 50,
) -> tuple[np.ndarray, np.ndarray]:
    """Compute FFT of a spike train.

    Parameters
    ----------
    spikes : ndarray of shape (T,) or (T, N)
        Binary spike train(s).
    dt : float
        Timestep in seconds.
    window_size : int
        Sliding window for instantaneous rate estimation.

    Returns
    -------
    (frequencies, magnitudes) tuple
        frequencies: ndarray of shape (F,)
        magnitudes: ndarray of shape (F,) or (F, N)
    """
    if spikes.ndim == 1:
        spikes = spikes[:, np.newaxis]
    T, N = spikes.shape

    # Compute instantaneous firing rate via sliding window
    rates = np.zeros((T, N), dtype=np.float64)
    for t in range(T):
        start = max(0, t - window_size + 1)
        rates[t] = spikes[start : t + 1].mean(axis=0) / dt

    # FFT
    fft_result = np.fft.rfft(rates, axis=0)
    magnitudes = np.abs(fft_result)
    frequencies = np.fft.rfftfreq(T, d=dt)

    if N == 1:
        magnitudes = magnitudes[:, 0]
    return frequencies, magnitudes

spike_power_spectrum(spikes, dt=0.001, window_size=50)

Compute power spectral density of a spike train.

Returns

(frequencies, psd) tuple

Source code in src/sc_neurocore/spike_dsp/spectral.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def spike_power_spectrum(
    spikes: np.ndarray,
    dt: float = 0.001,
    window_size: int = 50,
) -> tuple[np.ndarray, np.ndarray]:
    """Compute power spectral density of a spike train.

    Returns
    -------
    (frequencies, psd) tuple
    """
    freqs, mags = spike_fft(spikes, dt, window_size)
    psd = mags**2
    return freqs, psd

Wavelets

sc_neurocore.spike_dsp.wavelets

Spike-domain wavelet decomposition using multi-scale spike filtering.

spike_wavelet_decompose(spikes, n_scales=4, base_window=4)

Decompose spike train into frequency bands via multi-scale filtering.

Uses cascaded moving-average filters at doubling window sizes: scale 0: window=base_window, scale 1: window=2*base_window, etc. Each scale captures a different frequency band.

Parameters

spikes : ndarray of shape (T,) or (T, N) n_scales : int Number of wavelet scales. base_window : int Window size for finest scale.

Returns

list of ndarray One array per scale, shape (T,) or (T, N). Binary spike representation of activity at each frequency band.

Source code in src/sc_neurocore/spike_dsp/wavelets.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def spike_wavelet_decompose(
    spikes: np.ndarray,
    n_scales: int = 4,
    base_window: int = 4,
) -> list[np.ndarray]:
    """Decompose spike train into frequency bands via multi-scale filtering.

    Uses cascaded moving-average filters at doubling window sizes:
    scale 0: window=base_window, scale 1: window=2*base_window, etc.
    Each scale captures a different frequency band.

    Parameters
    ----------
    spikes : ndarray of shape (T,) or (T, N)
    n_scales : int
        Number of wavelet scales.
    base_window : int
        Window size for finest scale.

    Returns
    -------
    list of ndarray
        One array per scale, shape (T,) or (T, N). Binary spike representation
        of activity at each frequency band.
    """
    if spikes.ndim == 1:
        spikes = spikes[:, np.newaxis]
        squeeze = True
    else:
        squeeze = False

    T, N = spikes.shape
    scales = []

    for s in range(n_scales):
        window = base_window * (2**s)
        # Moving average at this scale
        smoothed = np.zeros((T, N), dtype=np.float64)
        for t in range(T):
            start = max(0, t - window + 1)
            smoothed[t] = spikes[start : t + 1].mean(axis=0)

        # Difference between adjacent scales = bandpass
        if s == 0:
            band = smoothed
        else:
            prev_window = base_window * (2 ** (s - 1))
            prev_smoothed = np.zeros((T, N), dtype=np.float64)
            for t in range(T):
                start = max(0, t - prev_window + 1)
                prev_smoothed[t] = spikes[start : t + 1].mean(axis=0)
            band = np.abs(prev_smoothed - smoothed)

        # Threshold to binary
        threshold = max(band.mean() * 0.5, 1e-8)
        binary_band = (band > threshold).astype(np.int8)

        scales.append(binary_band[:, 0] if squeeze else binary_band)

    return scales