Skip to content

Learning

Training paradigms beyond single-node STDP: BPTT, truncated BPTT, eligibility traces, reward-modulated learning, meta-learning, homeostatic scaling, short-term plasticity, structural plasticity, federated learning, lifelong/continual learning, neuroevolution, learning-rate schedulers, and training callbacks.

BPTT Learner

sc_neurocore.learning.advanced.BPTTLearner

Backpropagation Through Time for spiking networks.

Uses fast-sigmoid surrogate gradient (Neftci et al. 2019) to handle the spike non-differentiability.

Source code in src/sc_neurocore/learning/advanced.py
Python
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class BPTTLearner:
    """Backpropagation Through Time for spiking networks.

    Uses fast-sigmoid surrogate gradient (Neftci et al. 2019) to handle
    the spike non-differentiability.
    """

    def __init__(self, network: Any, loss_fn: Callable[..., float], lr: float = 1e-3) -> None:
        self.network = network
        self.loss_fn = loss_fn
        self.lr = lr

    def train_step(self, inputs: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
        """One BPTT step: forward pass, loss, backward with surrogate gradients.

        Parameters
        ----------
        inputs : np.ndarray[Any, Any]
            Shape (n_steps, n_input) input currents.
        targets : np.ndarray[Any, Any]
            Shape (n_steps, n_output) target spike trains.

        Returns
        -------
        float
            Scalar loss value.
        """
        n_steps = inputs.shape[0]
        for pop in self.network.populations:
            pop.reset_all()

        recorded_v = []
        recorded_spikes = []
        for t in range(n_steps):
            currents = inputs[t]
            pop = self.network.populations[0]
            spikes = pop.step_all(currents[: pop.n])
            recorded_v.append(pop.voltages.copy())
            recorded_spikes.append(spikes.copy())

        spike_arr = np.stack(recorded_spikes)
        loss = float(self.loss_fn(spike_arr, targets))

        output_error = spike_arr - targets
        for proj in self.network.projections:
            n_src = proj.source.n
            grad_w = np.zeros_like(proj.data)
            for t in range(n_steps):
                surr = _fast_sigmoid_surrogate(recorded_v[t])
                post_delta = output_error[t][: proj.target.n] * surr[: proj.target.n]
                for i in range(n_src):
                    for k in range(proj.indptr[i], proj.indptr[i + 1]):
                        j = proj.indices[k]
                        grad_w[k] += recorded_spikes[t][i] * post_delta[j]
            proj.data -= self.lr * grad_w / max(n_steps, 1)

        return loss

train_step(inputs, targets)

One BPTT step: forward pass, loss, backward with surrogate gradients.

Parameters

inputs : np.ndarray[Any, Any] Shape (n_steps, n_input) input currents. targets : np.ndarray[Any, Any] Shape (n_steps, n_output) target spike trains.

Returns

float Scalar loss value.

Source code in src/sc_neurocore/learning/advanced.py
Python
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def train_step(self, inputs: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
    """One BPTT step: forward pass, loss, backward with surrogate gradients.

    Parameters
    ----------
    inputs : np.ndarray[Any, Any]
        Shape (n_steps, n_input) input currents.
    targets : np.ndarray[Any, Any]
        Shape (n_steps, n_output) target spike trains.

    Returns
    -------
    float
        Scalar loss value.
    """
    n_steps = inputs.shape[0]
    for pop in self.network.populations:
        pop.reset_all()

    recorded_v = []
    recorded_spikes = []
    for t in range(n_steps):
        currents = inputs[t]
        pop = self.network.populations[0]
        spikes = pop.step_all(currents[: pop.n])
        recorded_v.append(pop.voltages.copy())
        recorded_spikes.append(spikes.copy())

    spike_arr = np.stack(recorded_spikes)
    loss = float(self.loss_fn(spike_arr, targets))

    output_error = spike_arr - targets
    for proj in self.network.projections:
        n_src = proj.source.n
        grad_w = np.zeros_like(proj.data)
        for t in range(n_steps):
            surr = _fast_sigmoid_surrogate(recorded_v[t])
            post_delta = output_error[t][: proj.target.n] * surr[: proj.target.n]
            for i in range(n_src):
                for k in range(proj.indptr[i], proj.indptr[i + 1]):
                    j = proj.indices[k]
                    grad_w[k] += recorded_spikes[t][i] * post_delta[j]
        proj.data -= self.lr * grad_w / max(n_steps, 1)

    return loss

Truncated BPTT (Williams & Peng 1990)

Chunks long sequences into windows of k timesteps, backpropagating gradients within each chunk while carrying membrane state forward. Memory O(k) instead of O(T).

sc_neurocore.learning.advanced.TBPTTLearner

Truncated Backpropagation Through Time for long sequences.

Splits input into chunks of k timesteps, backpropagating gradients only within each chunk while carrying forward state (membrane voltage) across boundaries. Reduces memory from O(T) to O(k).

Williams & Peng 1990.

Source code in src/sc_neurocore/learning/advanced.py
Python
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class TBPTTLearner:
    """Truncated Backpropagation Through Time for long sequences.

    Splits input into chunks of ``k`` timesteps, backpropagating gradients
    only within each chunk while carrying forward state (membrane voltage)
    across boundaries. Reduces memory from O(T) to O(k).

    Williams & Peng 1990.
    """

    def __init__(
        self, network: Any, loss_fn: Callable[..., float], lr: float = 1e-3, k: int = 50
    ) -> None:
        self.network = network
        self.loss_fn = loss_fn
        self.lr = lr
        self.k = k

    def train_step(self, inputs: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
        """One TBPTT step over the full sequence, chunked into windows of k.

        Parameters
        ----------
        inputs : np.ndarray[Any, Any]
            Shape (n_steps, n_input).
        targets : np.ndarray[Any, Any]
            Shape (n_steps, n_output).

        Returns
        -------
        float
            Total loss summed across chunks.
        """
        n_steps = inputs.shape[0]
        total_loss = 0.0

        for pop in self.network.populations:
            pop.reset_all()

        for chunk_start in range(0, n_steps, self.k):
            chunk_end = min(chunk_start + self.k, n_steps)
            chunk_len = chunk_end - chunk_start

            recorded_v = []
            recorded_spikes = []
            for t in range(chunk_start, chunk_end):
                pop = self.network.populations[0]
                spikes = pop.step_all(inputs[t][: pop.n])
                recorded_v.append(pop.voltages.copy())
                recorded_spikes.append(spikes.copy())

            spike_arr = np.stack(recorded_spikes)
            chunk_targets = targets[chunk_start:chunk_end]
            chunk_loss = float(self.loss_fn(spike_arr, chunk_targets))
            total_loss += chunk_loss

            # Backward within this chunk only
            output_error = spike_arr - chunk_targets
            for proj in self.network.projections:
                n_src = proj.source.n
                grad_w = np.zeros_like(proj.data)
                for t_local in range(chunk_len):
                    surr = _fast_sigmoid_surrogate(recorded_v[t_local])
                    post_delta = output_error[t_local][: proj.target.n] * surr[: proj.target.n]
                    for i in range(n_src):
                        for k_idx in range(proj.indptr[i], proj.indptr[i + 1]):
                            j = proj.indices[k_idx]
                            grad_w[k_idx] += recorded_spikes[t_local][i] * post_delta[j]
                proj.data -= self.lr * grad_w / max(chunk_len, 1)

            # State (voltages) carries forward — no reset between chunks

        return total_loss

train_step(inputs, targets)

One TBPTT step over the full sequence, chunked into windows of k.

Parameters

inputs : np.ndarray[Any, Any] Shape (n_steps, n_input). targets : np.ndarray[Any, Any] Shape (n_steps, n_output).

Returns

float Total loss summed across chunks.

Source code in src/sc_neurocore/learning/advanced.py
Python
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def train_step(self, inputs: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
    """One TBPTT step over the full sequence, chunked into windows of k.

    Parameters
    ----------
    inputs : np.ndarray[Any, Any]
        Shape (n_steps, n_input).
    targets : np.ndarray[Any, Any]
        Shape (n_steps, n_output).

    Returns
    -------
    float
        Total loss summed across chunks.
    """
    n_steps = inputs.shape[0]
    total_loss = 0.0

    for pop in self.network.populations:
        pop.reset_all()

    for chunk_start in range(0, n_steps, self.k):
        chunk_end = min(chunk_start + self.k, n_steps)
        chunk_len = chunk_end - chunk_start

        recorded_v = []
        recorded_spikes = []
        for t in range(chunk_start, chunk_end):
            pop = self.network.populations[0]
            spikes = pop.step_all(inputs[t][: pop.n])
            recorded_v.append(pop.voltages.copy())
            recorded_spikes.append(spikes.copy())

        spike_arr = np.stack(recorded_spikes)
        chunk_targets = targets[chunk_start:chunk_end]
        chunk_loss = float(self.loss_fn(spike_arr, chunk_targets))
        total_loss += chunk_loss

        # Backward within this chunk only
        output_error = spike_arr - chunk_targets
        for proj in self.network.projections:
            n_src = proj.source.n
            grad_w = np.zeros_like(proj.data)
            for t_local in range(chunk_len):
                surr = _fast_sigmoid_surrogate(recorded_v[t_local])
                post_delta = output_error[t_local][: proj.target.n] * surr[: proj.target.n]
                for i in range(n_src):
                    for k_idx in range(proj.indptr[i], proj.indptr[i + 1]):
                        j = proj.indices[k_idx]
                        grad_w[k_idx] += recorded_spikes[t_local][i] * post_delta[j]
            proj.data -= self.lr * grad_w / max(chunk_len, 1)

        # State (voltages) carries forward — no reset between chunks

    return total_loss

Eligibility Traces (e-prop, Bellec et al. 2020)

sc_neurocore.learning.advanced.EligibilityTrace

E-prop eligibility trace: three-factor learning (pre x post x error).

Bellec et al. 2020.

Source code in src/sc_neurocore/learning/advanced.py
Python
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class EligibilityTrace:
    """E-prop eligibility trace: three-factor learning (pre x post x error).

    Bellec et al. 2020.
    """

    def __init__(self, tau_e: float = 20.0, dt: float = 1.0) -> None:
        self.decay = float(np.exp(-dt / tau_e))
        self._trace: np.ndarray[Any, Any] | None = None

    def update(
        self,
        pre_spike: np.ndarray[Any, Any],
        post_spike: np.ndarray[Any, Any],
        error_signal: np.ndarray[Any, Any],
    ) -> np.ndarray[Any, Any]:
        """Compute weight delta from three-factor rule.

        Parameters
        ----------
        pre_spike, post_spike : np.ndarray[Any, Any]
            Binary (0/1) vectors of length n_pre, n_post.
        error_signal : np.ndarray[Any, Any]
            Error signal of length n_post.

        Returns
        -------
        np.ndarray[Any, Any]
            Weight delta matrix of shape (n_pre, n_post).
        """
        outer = np.outer(pre_spike, post_spike)
        if self._trace is None:
            self._trace = np.zeros_like(outer)
        self._trace = self.decay * self._trace + outer
        delta: np.ndarray[Any, Any] = self._trace * error_signal[np.newaxis, :]
        return delta

update(pre_spike, post_spike, error_signal)

Compute weight delta from three-factor rule.

Parameters

pre_spike, post_spike : np.ndarray[Any, Any] Binary (0/1) vectors of length n_pre, n_post. error_signal : np.ndarray[Any, Any] Error signal of length n_post.

Returns

np.ndarray[Any, Any] Weight delta matrix of shape (n_pre, n_post).

Source code in src/sc_neurocore/learning/advanced.py
Python
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def update(
    self,
    pre_spike: np.ndarray[Any, Any],
    post_spike: np.ndarray[Any, Any],
    error_signal: np.ndarray[Any, Any],
) -> np.ndarray[Any, Any]:
    """Compute weight delta from three-factor rule.

    Parameters
    ----------
    pre_spike, post_spike : np.ndarray[Any, Any]
        Binary (0/1) vectors of length n_pre, n_post.
    error_signal : np.ndarray[Any, Any]
        Error signal of length n_post.

    Returns
    -------
    np.ndarray[Any, Any]
        Weight delta matrix of shape (n_pre, n_post).
    """
    outer = np.outer(pre_spike, post_spike)
    if self._trace is None:
        self._trace = np.zeros_like(outer)
    self._trace = self.decay * self._trace + outer
    delta: np.ndarray[Any, Any] = self._trace * error_signal[np.newaxis, :]
    return delta

Reward-Modulated STDP

sc_neurocore.learning.advanced.RewardModulatedLearner

Reward-modulated STDP (R-STDP).

Maintains per-synapse eligibility traces and applies weight updates scaled by a global reward signal.

Source code in src/sc_neurocore/learning/advanced.py
Python
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
class RewardModulatedLearner:
    """Reward-modulated STDP (R-STDP).

    Maintains per-synapse eligibility traces and applies weight updates
    scaled by a global reward signal.
    """

    def __init__(self, network: Any, tau_reward: float = 100.0) -> None:
        self.network = network
        self.reward_decay = np.exp(-1.0 / tau_reward)
        self._elig: dict[int, np.ndarray[Any, Any]] = {}
        self._pre_trace: dict[int, np.ndarray[Any, Any]] = {}
        self._post_trace: dict[int, np.ndarray[Any, Any]] = {}
        self._init_traces()

    def _init_traces(self) -> None:
        for proj in self.network.projections:
            pid = id(proj)
            self._elig[pid] = np.zeros_like(proj.data)
            self._pre_trace[pid] = np.zeros(proj.source.n)
            self._post_trace[pid] = np.zeros(proj.target.n)

    def step(self, reward: float) -> None:
        """Apply reward-modulated weight update.

        Parameters
        ----------
        reward : float
            Scalar reward signal.
        """
        tau_trace = 20.0
        trace_decay = np.exp(-1.0 / tau_trace)
        for proj in self.network.projections:
            pid = id(proj)
            pre_sp = proj.source.voltages > 0.9
            post_sp = proj.target.voltages > 0.9
            self._pre_trace[pid] = trace_decay * self._pre_trace[pid] + pre_sp
            self._post_trace[pid] = trace_decay * self._post_trace[pid] + post_sp

            for i in range(proj.source.n):
                for k in range(proj.indptr[i], proj.indptr[i + 1]):
                    j = proj.indices[k]
                    self._elig[pid][k] = (
                        self.reward_decay * self._elig[pid][k]
                        + self._pre_trace[pid][i] * self._post_trace[pid][j]
                    )
            proj.data += 0.01 * reward * self._elig[pid]
            np.clip(proj.data, 0.0, None, out=proj.data)

step(reward)

Apply reward-modulated weight update.

Parameters

reward : float Scalar reward signal.

Source code in src/sc_neurocore/learning/advanced.py
Python
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def step(self, reward: float) -> None:
    """Apply reward-modulated weight update.

    Parameters
    ----------
    reward : float
        Scalar reward signal.
    """
    tau_trace = 20.0
    trace_decay = np.exp(-1.0 / tau_trace)
    for proj in self.network.projections:
        pid = id(proj)
        pre_sp = proj.source.voltages > 0.9
        post_sp = proj.target.voltages > 0.9
        self._pre_trace[pid] = trace_decay * self._pre_trace[pid] + pre_sp
        self._post_trace[pid] = trace_decay * self._post_trace[pid] + post_sp

        for i in range(proj.source.n):
            for k in range(proj.indptr[i], proj.indptr[i + 1]):
                j = proj.indices[k]
                self._elig[pid][k] = (
                    self.reward_decay * self._elig[pid][k]
                    + self._pre_trace[pid][i] * self._post_trace[pid][j]
                )
        proj.data += 0.01 * reward * self._elig[pid]
        np.clip(proj.data, 0.0, None, out=proj.data)

Learning-Rate Schedulers

Import these schedulers from either sc_neurocore.learning or sc_neurocore.learning.schedulers. They keep only deterministic local state and return the current learning rate from each step() call.

sc_neurocore.learning.schedulers.StepScheduler

Drop learning rate by gamma every step_size steps.

Source code in src/sc_neurocore/learning/schedulers.py
Python
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class StepScheduler:
    """Drop learning rate by *gamma* every *step_size* steps."""

    def __init__(self, lr_init: float, step_size: int, gamma: float = 0.1):
        self.lr = lr_init
        self.step_size = step_size
        self.gamma = gamma
        self._count = 0

    def step(self) -> float:
        """Advance one scheduler step and return the current learning rate."""
        self._count += 1
        if self._count % self.step_size == 0:
            self.lr *= self.gamma
        return self.lr

    def reset(self) -> None:
        """Reset the internal step counter without changing the current rate."""
        self._count = 0

step()

Advance one scheduler step and return the current learning rate.

Source code in src/sc_neurocore/learning/schedulers.py
Python
35
36
37
38
39
40
def step(self) -> float:
    """Advance one scheduler step and return the current learning rate."""
    self._count += 1
    if self._count % self.step_size == 0:
        self.lr *= self.gamma
    return self.lr

reset()

Reset the internal step counter without changing the current rate.

Source code in src/sc_neurocore/learning/schedulers.py
Python
42
43
44
def reset(self) -> None:
    """Reset the internal step counter without changing the current rate."""
    self._count = 0

sc_neurocore.learning.schedulers.ExponentialScheduler

Multiply learning rate by gamma each step.

Source code in src/sc_neurocore/learning/schedulers.py
Python
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class ExponentialScheduler:
    """Multiply learning rate by *gamma* each step."""

    def __init__(self, lr_init: float, gamma: float = 0.999):
        self.lr = lr_init
        self.gamma = gamma

    def step(self) -> float:
        """Apply one exponential decay update and return the new rate."""
        self.lr *= self.gamma
        return self.lr

    def reset(self) -> None:
        """Leave the stateless exponential schedule unchanged."""
        pass

step()

Apply one exponential decay update and return the new rate.

Source code in src/sc_neurocore/learning/schedulers.py
Python
54
55
56
57
def step(self) -> float:
    """Apply one exponential decay update and return the new rate."""
    self.lr *= self.gamma
    return self.lr

reset()

Leave the stateless exponential schedule unchanged.

Source code in src/sc_neurocore/learning/schedulers.py
Python
59
60
61
def reset(self) -> None:
    """Leave the stateless exponential schedule unchanged."""
    pass

sc_neurocore.learning.schedulers.CosineScheduler

Cosine annealing from lr_init to lr_min over total_steps.

Source code in src/sc_neurocore/learning/schedulers.py
Python
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class CosineScheduler:
    """Cosine annealing from *lr_init* to *lr_min* over *total_steps*."""

    def __init__(self, lr_init: float, lr_min: float, total_steps: int):
        self.lr_init = lr_init
        self.lr_min = lr_min
        self.total_steps = total_steps
        self._count = 0
        self.lr = lr_init

    def step(self) -> float:
        """Advance one cosine-annealing step and return the new rate."""
        self._count += 1
        t = min(self._count / self.total_steps, 1.0)
        self.lr = self.lr_min + 0.5 * (self.lr_init - self.lr_min) * (1 + math.cos(math.pi * t))
        return self.lr

    def reset(self) -> None:
        """Restore the initial learning rate and restart the cosine schedule."""
        self._count = 0
        self.lr = self.lr_init

step()

Advance one cosine-annealing step and return the new rate.

Source code in src/sc_neurocore/learning/schedulers.py
Python
74
75
76
77
78
79
def step(self) -> float:
    """Advance one cosine-annealing step and return the new rate."""
    self._count += 1
    t = min(self._count / self.total_steps, 1.0)
    self.lr = self.lr_min + 0.5 * (self.lr_init - self.lr_min) * (1 + math.cos(math.pi * t))
    return self.lr

reset()

Restore the initial learning rate and restart the cosine schedule.

Source code in src/sc_neurocore/learning/schedulers.py
Python
81
82
83
84
def reset(self) -> None:
    """Restore the initial learning rate and restart the cosine schedule."""
    self._count = 0
    self.lr = self.lr_init

sc_neurocore.learning.schedulers.WarmupCosineScheduler

Linear warmup followed by cosine decay.

Source code in src/sc_neurocore/learning/schedulers.py
Python
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class WarmupCosineScheduler:
    """Linear warmup followed by cosine decay."""

    def __init__(
        self,
        lr_init: float,
        lr_min: float,
        warmup_steps: int,
        total_steps: int,
    ):
        self.lr_init = lr_init
        self.lr_min = lr_min
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps
        self._count = 0
        self.lr = 0.0

    def step(self) -> float:
        """Advance through warmup or cosine decay and return the current rate."""
        self._count += 1
        if self._count <= self.warmup_steps:
            self.lr = self.lr_init * (self._count / self.warmup_steps)
        else:
            decay_steps = self.total_steps - self.warmup_steps
            t = min((self._count - self.warmup_steps) / decay_steps, 1.0)
            self.lr = self.lr_min + 0.5 * (self.lr_init - self.lr_min) * (1 + math.cos(math.pi * t))
        return self.lr

    def reset(self) -> None:
        """Return to the pre-warmup state with zero current learning rate."""
        self._count = 0
        self.lr = 0.0

step()

Advance through warmup or cosine decay and return the current rate.

Source code in src/sc_neurocore/learning/schedulers.py
Python
104
105
106
107
108
109
110
111
112
113
def step(self) -> float:
    """Advance through warmup or cosine decay and return the current rate."""
    self._count += 1
    if self._count <= self.warmup_steps:
        self.lr = self.lr_init * (self._count / self.warmup_steps)
    else:
        decay_steps = self.total_steps - self.warmup_steps
        t = min((self._count - self.warmup_steps) / decay_steps, 1.0)
        self.lr = self.lr_min + 0.5 * (self.lr_init - self.lr_min) * (1 + math.cos(math.pi * t))
    return self.lr

reset()

Return to the pre-warmup state with zero current learning rate.

Source code in src/sc_neurocore/learning/schedulers.py
Python
115
116
117
118
def reset(self) -> None:
    """Return to the pre-warmup state with zero current learning rate."""
    self._count = 0
    self.lr = 0.0

Training Callbacks

Import callbacks from either sc_neurocore.learning or sc_neurocore.learning.callbacks. CSVCallback has no optional dependencies; TensorBoardCallback and WandBCallback fail closed with SCDependencyError when their optional runtime packages are not installed.

sc_neurocore.learning.callbacks.TrainingCallback

Base class for training callbacks.

Source code in src/sc_neurocore/learning/callbacks.py
Python
26
27
28
29
30
31
32
33
class TrainingCallback:
    """Base class for training callbacks."""

    def log(self, metrics: dict[str, float], step: int) -> None:
        """Record a mapping of metric names to values at the given step."""

    def close(self) -> None:
        """Flush and release any resources held by the callback."""

log(metrics, step)

Record a mapping of metric names to values at the given step.

Source code in src/sc_neurocore/learning/callbacks.py
Python
29
30
def log(self, metrics: dict[str, float], step: int) -> None:
    """Record a mapping of metric names to values at the given step."""

close()

Flush and release any resources held by the callback.

Source code in src/sc_neurocore/learning/callbacks.py
Python
32
33
def close(self) -> None:
    """Flush and release any resources held by the callback."""

sc_neurocore.learning.callbacks.CSVCallback

Bases: TrainingCallback

Log metrics to a CSV file (no dependencies).

Source code in src/sc_neurocore/learning/callbacks.py
Python
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class CSVCallback(TrainingCallback):
    """Log metrics to a CSV file (no dependencies)."""

    def __init__(self, path: str = "metrics.csv"):
        self._path = path
        self._rows: list[dict[str, float | int]] = []

    def log(self, metrics: dict[str, float], step: int) -> None:
        """Buffer one row of metrics for the given step in memory."""
        self._rows.append({"step": step, **metrics})

    def close(self) -> None:
        """Write all buffered metric rows to the CSV file."""
        if not self._rows:
            return
        keys = list(self._rows[0].keys())
        with open(self._path, "w", newline="") as f:
            f.write(",".join(keys) + "\n")
            for row in self._rows:
                f.write(",".join(str(row[k]) for k in keys) + "\n")

log(metrics, step)

Buffer one row of metrics for the given step in memory.

Source code in src/sc_neurocore/learning/callbacks.py
Python
92
93
94
def log(self, metrics: dict[str, float], step: int) -> None:
    """Buffer one row of metrics for the given step in memory."""
    self._rows.append({"step": step, **metrics})

close()

Write all buffered metric rows to the CSV file.

Source code in src/sc_neurocore/learning/callbacks.py
Python
 96
 97
 98
 99
100
101
102
103
104
def close(self) -> None:
    """Write all buffered metric rows to the CSV file."""
    if not self._rows:
        return
    keys = list(self._rows[0].keys())
    with open(self._path, "w", newline="") as f:
        f.write(",".join(keys) + "\n")
        for row in self._rows:
            f.write(",".join(str(row[k]) for k in keys) + "\n")

sc_neurocore.learning.callbacks.TensorBoardCallback

Bases: TrainingCallback

Log scalars to TensorBoard via torch.utils.tensorboard.

Source code in src/sc_neurocore/learning/callbacks.py
Python
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class TensorBoardCallback(TrainingCallback):
    """Log scalars to TensorBoard via ``torch.utils.tensorboard``."""

    def __init__(self, log_dir: str = "runs"):
        try:
            from torch.utils.tensorboard import SummaryWriter
        except ImportError:
            from sc_neurocore.exceptions import SCDependencyError

            raise SCDependencyError("TensorBoard requires torch: pip install sc-neurocore[gpu]")
        # ``SummaryWriter`` ships without type stubs; binding it through an ``Any``
        # handle keeps construction and method calls strict-clean regardless of
        # whether torch stubs are installed in the environment.
        writer_factory: Any = SummaryWriter
        self._writer: Any = writer_factory(log_dir=log_dir)

    def log(self, metrics: dict[str, float], step: int) -> None:
        """Write each metric as a TensorBoard scalar at the given step."""
        for key, value in metrics.items():
            self._writer.add_scalar(key, value, step)

    def close(self) -> None:
        """Close the underlying TensorBoard summary writer."""
        self._writer.close()

log(metrics, step)

Write each metric as a TensorBoard scalar at the given step.

Source code in src/sc_neurocore/learning/callbacks.py
Python
52
53
54
55
def log(self, metrics: dict[str, float], step: int) -> None:
    """Write each metric as a TensorBoard scalar at the given step."""
    for key, value in metrics.items():
        self._writer.add_scalar(key, value, step)

close()

Close the underlying TensorBoard summary writer.

Source code in src/sc_neurocore/learning/callbacks.py
Python
57
58
59
def close(self) -> None:
    """Close the underlying TensorBoard summary writer."""
    self._writer.close()

sc_neurocore.learning.callbacks.WandBCallback

Bases: TrainingCallback

Log metrics to Weights & Biases.

Source code in src/sc_neurocore/learning/callbacks.py
Python
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class WandBCallback(TrainingCallback):
    """Log metrics to Weights & Biases."""

    def __init__(self, project: str = "sc-neurocore", **init_kwargs: Any):
        try:
            import wandb

            self._wandb = wandb
        except ImportError:
            from sc_neurocore.exceptions import SCDependencyError

            raise SCDependencyError("W&B requires wandb: pip install wandb")
        self._wandb.init(project=project, **init_kwargs)

    def log(self, metrics: dict[str, float], step: int) -> None:
        """Forward the metrics to the active Weights & Biases run."""
        self._wandb.log(metrics, step=step)

    def close(self) -> None:
        """Finish the active Weights & Biases run."""
        self._wandb.finish()

log(metrics, step)

Forward the metrics to the active Weights & Biases run.

Source code in src/sc_neurocore/learning/callbacks.py
Python
76
77
78
def log(self, metrics: dict[str, float], step: int) -> None:
    """Forward the metrics to the active Weights & Biases run."""
    self._wandb.log(metrics, step=step)

close()

Finish the active Weights & Biases run.

Source code in src/sc_neurocore/learning/callbacks.py
Python
80
81
82
def close(self) -> None:
    """Finish the active Weights & Biases run."""
    self._wandb.finish()

Bounded O(1) Online Learning

sc_neurocore.learning.online_o1 defines the hardware-facing local-learning contract for streamed online updates. One synapse stores only four bounded state fields: current weight, pre trace, post trace, and eligibility trace. The memory proof emitted by build_online_o1_memory_proof(...) is independent of sequence length and reports the exact per-synapse bit count used by the HDL emitter.

The first supported rule family is reward-modulated STDP with fixed-point saturation. OnlineO1Config.to_scnir_annotation(...) emits deterministic SC-NIR metadata for online-learning-capable synapses, and sc_neurocore.hdl_gen.OnlineO1LearningEmitter emits a one-lane Verilog update block with the same bounded state fields and saturation policy. This software and RTL contract is local handoff evidence; it does not claim board synthesis or physical FPGA learning evidence until those runs are attached separately. When the optional native library is present, RustOnlineO1Synapse provides the same bounded fixed-point step contract through the Rust C-FFI bridge.

OnlineO1LearningEmitter.estimate_resources(...) reports deterministic pre-synthesis BRAM/LUT/DSP planning estimates for a chosen synapse count. The reproducible adaptation benchmark command is:

Bash
PYTHONPATH=src python tools/online_o1_adaptation_benchmark.py \
  --output benchmarks/results/online_o1_adaptation_benchmark.json

The default local report is deterministic simulation evidence: it measures pre/post reward-pairing adaptation speed, records Python/Rust parity when the native library is present, and marks hardware_measurement_claimed=false.

sc_neurocore.learning.online_o1.OnlineO1Config dataclass

Hardware-bounded configuration for local reward-modulated STDP.

Source code in src/sc_neurocore/learning/online_o1.py
Python
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@dataclass(frozen=True, slots=True)
class OnlineO1Config:
    """Hardware-bounded configuration for local reward-modulated STDP."""

    weight_bits: int = 16
    trace_bits: int = 12
    reward_bits: int = 8
    learning_shift: int = 4
    trace_decay_shift: int = 4
    rule_family: Literal["reward_modulated_stdp"] = "reward_modulated_stdp"

    def __post_init__(self) -> None:
        if self.weight_bits < 1:
            raise ValueError("weight_bits must be >= 1")
        if self.trace_bits < 2:
            raise ValueError("trace_bits must be >= 2")
        if self.reward_bits < 1:
            raise ValueError("reward_bits must be >= 1")
        if self.learning_shift < 0:
            raise ValueError("learning_shift must be >= 0")
        if self.trace_decay_shift < 0:
            raise ValueError("trace_decay_shift must be >= 0")

    @property
    def max_weight(self) -> int:
        """Maximum unsigned fixed-point weight."""

        return (1 << self.weight_bits) - 1

    @property
    def max_trace(self) -> int:
        """Maximum unsigned trace value."""

        return (1 << self.trace_bits) - 1

    @property
    def min_eligibility(self) -> int:
        """Minimum signed eligibility value."""

        return -(1 << (self.trace_bits - 1))

    @property
    def max_eligibility(self) -> int:
        """Maximum signed eligibility value."""

        return (1 << (self.trace_bits - 1)) - 1

    @property
    def min_reward(self) -> int:
        """Minimum signed reward input."""

        return -(1 << (self.reward_bits - 1))

    @property
    def max_reward(self) -> int:
        """Maximum signed reward input."""

        return (1 << (self.reward_bits - 1)) - 1

    @property
    def per_synapse_state_bits(self) -> int:
        """Stored bits per synapse: weight plus three bounded traces."""

        return self.weight_bits + 3 * self.trace_bits

    def to_scnir_annotation(self, *, rule_id: str) -> dict[str, Any]:
        """Return deterministic SC-NIR metadata for online-learning synapses."""

        if not rule_id:
            raise ValueError("rule_id must be non-empty")
        return {
            "schema_version": ONLINE_O1_ANNOTATION_SCHEMA_VERSION,
            "rule_id": rule_id,
            "rule_family": self.rule_family,
            "state_fields": list(_STATE_FIELDS),
            "per_synapse_state_bits": self.per_synapse_state_bits,
            "weight_bits": self.weight_bits,
            "trace_bits": self.trace_bits,
            "reward_bits": self.reward_bits,
            "learning_shift": self.learning_shift,
            "trace_decay_shift": self.trace_decay_shift,
            "saturation_policy": "signed_eligibility_unsigned_weight",
            "hidden_history_fields": [],
            "sequence_length_independent": True,
        }

max_weight property

Maximum unsigned fixed-point weight.

max_trace property

Maximum unsigned trace value.

min_eligibility property

Minimum signed eligibility value.

max_eligibility property

Maximum signed eligibility value.

min_reward property

Minimum signed reward input.

max_reward property

Maximum signed reward input.

per_synapse_state_bits property

Stored bits per synapse: weight plus three bounded traces.

to_scnir_annotation(*, rule_id)

Return deterministic SC-NIR metadata for online-learning synapses.

Source code in src/sc_neurocore/learning/online_o1.py
Python
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def to_scnir_annotation(self, *, rule_id: str) -> dict[str, Any]:
    """Return deterministic SC-NIR metadata for online-learning synapses."""

    if not rule_id:
        raise ValueError("rule_id must be non-empty")
    return {
        "schema_version": ONLINE_O1_ANNOTATION_SCHEMA_VERSION,
        "rule_id": rule_id,
        "rule_family": self.rule_family,
        "state_fields": list(_STATE_FIELDS),
        "per_synapse_state_bits": self.per_synapse_state_bits,
        "weight_bits": self.weight_bits,
        "trace_bits": self.trace_bits,
        "reward_bits": self.reward_bits,
        "learning_shift": self.learning_shift,
        "trace_decay_shift": self.trace_decay_shift,
        "saturation_policy": "signed_eligibility_unsigned_weight",
        "hidden_history_fields": [],
        "sequence_length_independent": True,
    }

sc_neurocore.learning.online_o1.OnlineO1Synapse dataclass

One fixed-point reward-modulated STDP synapse with O(1) state.

Source code in src/sc_neurocore/learning/online_o1.py
Python
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
@dataclass(slots=True)
class OnlineO1Synapse:
    """One fixed-point reward-modulated STDP synapse with O(1) state."""

    config: OnlineO1Config
    initial_weight: int = 0
    weight: int = 0
    pre_trace: int = 0
    post_trace: int = 0
    eligibility: int = 0

    def __post_init__(self) -> None:
        self.weight = _saturate(self.initial_weight, 0, self.config.max_weight)
        self.pre_trace = 0
        self.post_trace = 0
        self.eligibility = 0

    @property
    def state_fields(self) -> tuple[str, ...]:
        """Names of state fields retained between timesteps."""

        return _STATE_FIELDS

    @property
    def state_bit_count(self) -> int:
        """Stored state bits for one synapse."""

        return self.config.per_synapse_state_bits

    def snapshot(self) -> OnlineO1Snapshot:
        """Return the current bounded state."""

        return OnlineO1Snapshot(
            weight=self.weight,
            pre_trace=self.pre_trace,
            post_trace=self.post_trace,
            eligibility=self.eligibility,
        )

    def step(self, *, pre_spike: bool, post_spike: bool, reward: int) -> OnlineO1Snapshot:
        """Advance one streamed timestep and return the bounded state.

        The rule uses pre-before-post eligibility:

        ``eligibility += post_spike * pre_trace - pre_spike * post_trace``

        The reward-gated weight update is an arithmetic right shift of the
        product, then saturated into the unsigned weight range.
        """

        reward = _saturate(reward, self.config.min_reward, self.config.max_reward)
        previous_pre_trace = self.pre_trace
        previous_post_trace = self.post_trace

        self.pre_trace = _decay_unsigned(
            self.pre_trace, self.config.trace_decay_shift, self.config.max_trace
        )
        self.post_trace = _decay_unsigned(
            self.post_trace, self.config.trace_decay_shift, self.config.max_trace
        )
        if pre_spike:
            self.pre_trace = self.config.max_trace
        if post_spike:
            self.post_trace = self.config.max_trace

        decayed_eligibility = _decay_signed(self.eligibility, self.config.trace_decay_shift)
        potentiation = 0
        if post_spike:
            potentiation = self.config.max_trace if pre_spike else previous_pre_trace
        depression = previous_post_trace if pre_spike else 0
        eligibility_delta = potentiation - depression
        self.eligibility = _saturate(
            decayed_eligibility + eligibility_delta,
            self.config.min_eligibility,
            self.config.max_eligibility,
        )

        weight_delta = (reward * self.eligibility) >> self.config.learning_shift
        self.weight = _saturate(self.weight + weight_delta, 0, self.config.max_weight)
        return self.snapshot()

state_fields property

Names of state fields retained between timesteps.

state_bit_count property

Stored state bits for one synapse.

snapshot()

Return the current bounded state.

Source code in src/sc_neurocore/learning/online_o1.py
Python
154
155
156
157
158
159
160
161
162
def snapshot(self) -> OnlineO1Snapshot:
    """Return the current bounded state."""

    return OnlineO1Snapshot(
        weight=self.weight,
        pre_trace=self.pre_trace,
        post_trace=self.post_trace,
        eligibility=self.eligibility,
    )

step(*, pre_spike, post_spike, reward)

Advance one streamed timestep and return the bounded state.

The rule uses pre-before-post eligibility:

eligibility += post_spike * pre_trace - pre_spike * post_trace

The reward-gated weight update is an arithmetic right shift of the product, then saturated into the unsigned weight range.

Source code in src/sc_neurocore/learning/online_o1.py
Python
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def step(self, *, pre_spike: bool, post_spike: bool, reward: int) -> OnlineO1Snapshot:
    """Advance one streamed timestep and return the bounded state.

    The rule uses pre-before-post eligibility:

    ``eligibility += post_spike * pre_trace - pre_spike * post_trace``

    The reward-gated weight update is an arithmetic right shift of the
    product, then saturated into the unsigned weight range.
    """

    reward = _saturate(reward, self.config.min_reward, self.config.max_reward)
    previous_pre_trace = self.pre_trace
    previous_post_trace = self.post_trace

    self.pre_trace = _decay_unsigned(
        self.pre_trace, self.config.trace_decay_shift, self.config.max_trace
    )
    self.post_trace = _decay_unsigned(
        self.post_trace, self.config.trace_decay_shift, self.config.max_trace
    )
    if pre_spike:
        self.pre_trace = self.config.max_trace
    if post_spike:
        self.post_trace = self.config.max_trace

    decayed_eligibility = _decay_signed(self.eligibility, self.config.trace_decay_shift)
    potentiation = 0
    if post_spike:
        potentiation = self.config.max_trace if pre_spike else previous_pre_trace
    depression = previous_post_trace if pre_spike else 0
    eligibility_delta = potentiation - depression
    self.eligibility = _saturate(
        decayed_eligibility + eligibility_delta,
        self.config.min_eligibility,
        self.config.max_eligibility,
    )

    weight_delta = (reward * self.eligibility) >> self.config.learning_shift
    self.weight = _saturate(self.weight + weight_delta, 0, self.config.max_weight)
    return self.snapshot()

sc_neurocore.learning.online_o1.build_online_o1_memory_proof(*, n_synapses, config, sequence_length=None)

Return a sequence-length independent memory proof for the rule.

Source code in src/sc_neurocore/learning/online_o1.py
Python
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def build_online_o1_memory_proof(
    *, n_synapses: int, config: OnlineO1Config, sequence_length: int | None = None
) -> dict[str, Any]:
    """Return a sequence-length independent memory proof for the rule."""

    if n_synapses < 0:
        raise ValueError("n_synapses must be >= 0")
    if sequence_length is not None and sequence_length < 0:
        raise ValueError("sequence_length must be >= 0")
    total_state_bits = n_synapses * config.per_synapse_state_bits
    return {
        "schema_version": ONLINE_O1_MEMORY_PROOF_SCHEMA_VERSION,
        "n_synapses": n_synapses,
        "state_fields": list(_STATE_FIELDS),
        "per_synapse_state_bits": config.per_synapse_state_bits,
        "total_state_bits": total_state_bits,
        "sequence_length_independent": True,
        "hidden_history_fields": [],
    }

sc_neurocore._native.learning_bridge.RustOnlineO1Synapse

RAII wrapper for the Rust bounded fixed-point online O(1) learner.

Source code in src/sc_neurocore/_native/learning_bridge.py
Python
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
class RustOnlineO1Synapse:
    """RAII wrapper for the Rust bounded fixed-point online O(1) learner."""

    __slots__ = ("_ptr",)

    def __init__(
        self,
        *,
        weight_bits: int = 16,
        trace_bits: int = 12,
        reward_bits: int = 8,
        learning_shift: int = 4,
        trace_decay_shift: int = 4,
        initial_weight: int = 0,
    ) -> None:
        if not _HAS_LEARNING:
            raise RuntimeError("libautonomous_learning.so not available")
        lib = _get_lib()
        if not hasattr(lib, "create_online_o1_synapse"):
            raise RuntimeError("libautonomous_learning.so lacks online O(1) symbols")
        self._ptr = lib.create_online_o1_synapse(
            _ct.c_uint8(weight_bits),
            _ct.c_uint8(trace_bits),
            _ct.c_uint8(reward_bits),
            _ct.c_uint8(learning_shift),
            _ct.c_uint8(trace_decay_shift),
            _ct.c_uint32(initial_weight),
        )
        if not self._ptr:
            raise ValueError("invalid online O(1) fixed-point configuration")

    def step(self, *, pre_spike: bool, post_spike: bool, reward: int) -> OnlineO1SnapshotFFI:
        """Advance one timestep and return the bounded fixed-point state."""

        snapshot: OnlineO1SnapshotFFI = _get_lib().step_online_o1_synapse(
            self._ptr,
            pre_spike,
            post_spike,
            _ct.c_int32(reward),
        )
        return snapshot

    @property
    def per_synapse_state_bits(self) -> int:
        return int(_get_lib().online_o1_per_synapse_state_bits(self._ptr))

    def __del__(self) -> None:
        if hasattr(self, "_ptr") and self._ptr and _HAS_LEARNING:
            lib = _get_lib()
            if hasattr(lib, "destroy_online_o1_synapse"):
                lib.destroy_online_o1_synapse(self._ptr)
            self._ptr = None

step(*, pre_spike, post_spike, reward)

Advance one timestep and return the bounded fixed-point state.

Source code in src/sc_neurocore/_native/learning_bridge.py
Python
297
298
299
300
301
302
303
304
305
306
def step(self, *, pre_spike: bool, post_spike: bool, reward: int) -> OnlineO1SnapshotFFI:
    """Advance one timestep and return the bounded fixed-point state."""

    snapshot: OnlineO1SnapshotFFI = _get_lib().step_online_o1_synapse(
        self._ptr,
        pre_spike,
        post_spike,
        _ct.c_int32(reward),
    )
    return snapshot

Meta-Learning (MAML, Finn et al. 2017)

sc_neurocore.learning.advanced.MetaLearner

MAML-style meta-learning for spiking networks.

Finn et al. 2017. Inner loop: fast adaptation on a task. Outer loop: meta-gradient across tasks.

Source code in src/sc_neurocore/learning/advanced.py
Python
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class MetaLearner:
    """MAML-style meta-learning for spiking networks.

    Finn et al. 2017. Inner loop: fast adaptation on a task.
    Outer loop: meta-gradient across tasks.
    """

    def __init__(self, network: Any, inner_lr: float = 0.01, outer_lr: float = 0.001) -> None:
        self.network = network
        self.inner_lr = inner_lr
        self.outer_lr = outer_lr

    def _snapshot_weights(self) -> list[np.ndarray[Any, Any]]:
        return [proj.data.copy() for proj in self.network.projections]

    def _restore_weights(self, snapshot: list[np.ndarray[Any, Any]]) -> None:
        for proj, w in zip(self.network.projections, snapshot):
            proj.data[:] = w

    def inner_loop(
        self, task_data: tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]], n_steps: int = 5
    ) -> None:
        """Fast adaptation: n_steps of gradient descent on task_data.

        Parameters
        ----------
        task_data : tuple
            (inputs, targets) arrays.
        n_steps : int
            Number of inner-loop updates.
        """
        inputs, targets = task_data
        for _ in range(n_steps):
            for pop in self.network.populations:
                pop.reset_all()
            n_t = inputs.shape[0]
            recorded_spikes = []
            for t in range(n_t):
                pop = self.network.populations[0]
                spikes = pop.step_all(inputs[t][: pop.n])
                recorded_spikes.append(spikes.copy())
            spike_arr = np.stack(recorded_spikes)
            error = spike_arr - targets
            for proj in self.network.projections:
                grad = np.zeros_like(proj.data)
                for t in range(n_t):
                    for i in range(proj.source.n):
                        for k in range(proj.indptr[i], proj.indptr[i + 1]):
                            j = proj.indices[k]
                            grad[k] += recorded_spikes[t][i] * error[t][j]
                proj.data -= self.inner_lr * grad / max(n_t, 1)

    def outer_step(self, tasks: list[tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]]]) -> None:
        """Meta-gradient update across multiple tasks.

        Parameters
        ----------
        tasks : list of tuple
            Each element is (inputs, targets).
        """
        meta_grad = [np.zeros_like(proj.data) for proj in self.network.projections]
        base_weights = self._snapshot_weights()

        for task in tasks:
            self._restore_weights(base_weights)
            pre_weights = self._snapshot_weights()
            self.inner_loop(task)
            for idx, proj in enumerate(self.network.projections):
                meta_grad[idx] += proj.data - pre_weights[idx]

        self._restore_weights(base_weights)
        for idx, proj in enumerate(self.network.projections):
            proj.data += self.outer_lr * meta_grad[idx] / max(len(tasks), 1)

inner_loop(task_data, n_steps=5)

Fast adaptation: n_steps of gradient descent on task_data.

Parameters

task_data : tuple (inputs, targets) arrays. n_steps : int Number of inner-loop updates.

Source code in src/sc_neurocore/learning/advanced.py
Python
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def inner_loop(
    self, task_data: tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]], n_steps: int = 5
) -> None:
    """Fast adaptation: n_steps of gradient descent on task_data.

    Parameters
    ----------
    task_data : tuple
        (inputs, targets) arrays.
    n_steps : int
        Number of inner-loop updates.
    """
    inputs, targets = task_data
    for _ in range(n_steps):
        for pop in self.network.populations:
            pop.reset_all()
        n_t = inputs.shape[0]
        recorded_spikes = []
        for t in range(n_t):
            pop = self.network.populations[0]
            spikes = pop.step_all(inputs[t][: pop.n])
            recorded_spikes.append(spikes.copy())
        spike_arr = np.stack(recorded_spikes)
        error = spike_arr - targets
        for proj in self.network.projections:
            grad = np.zeros_like(proj.data)
            for t in range(n_t):
                for i in range(proj.source.n):
                    for k in range(proj.indptr[i], proj.indptr[i + 1]):
                        j = proj.indices[k]
                        grad[k] += recorded_spikes[t][i] * error[t][j]
            proj.data -= self.inner_lr * grad / max(n_t, 1)

outer_step(tasks)

Meta-gradient update across multiple tasks.

Parameters

tasks : list of tuple Each element is (inputs, targets).

Source code in src/sc_neurocore/learning/advanced.py
Python
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def outer_step(self, tasks: list[tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]]]) -> None:
    """Meta-gradient update across multiple tasks.

    Parameters
    ----------
    tasks : list of tuple
        Each element is (inputs, targets).
    """
    meta_grad = [np.zeros_like(proj.data) for proj in self.network.projections]
    base_weights = self._snapshot_weights()

    for task in tasks:
        self._restore_weights(base_weights)
        pre_weights = self._snapshot_weights()
        self.inner_loop(task)
        for idx, proj in enumerate(self.network.projections):
            meta_grad[idx] += proj.data - pre_weights[idx]

    self._restore_weights(base_weights)
    for idx, proj in enumerate(self.network.projections):
        proj.data += self.outer_lr * meta_grad[idx] / max(len(tasks), 1)

Homeostatic Plasticity (Turrigiano 2008)

sc_neurocore.learning.advanced.HomeostaticPlasticity

Homeostatic synaptic scaling to maintain target firing rate.

Turrigiano 2008. Multiplicatively scales all incoming weights to keep the population mean rate near target_rate.

Source code in src/sc_neurocore/learning/advanced.py
Python
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
class HomeostaticPlasticity:
    """Homeostatic synaptic scaling to maintain target firing rate.

    Turrigiano 2008. Multiplicatively scales all incoming weights to keep
    the population mean rate near target_rate.
    """

    def __init__(self, target_rate: float = 10.0, tau: float = 1000.0) -> None:
        self.target_rate = target_rate
        self.tau = tau
        self._rate_estimate: float | None = None

    def update(self, population: Any) -> None:
        """Scale weights of all incoming projections to *population*.

        Parameters
        ----------
        population : Population
            Target population whose rate should be regulated.
        """
        current_rate = np.mean(population.voltages > 0.9) * 1000.0
        if self._rate_estimate is None:
            self._rate_estimate = current_rate
        alpha = 1.0 / self.tau
        self._rate_estimate += alpha * (current_rate - self._rate_estimate)
        if self._rate_estimate <= 0:
            return
        scale = self.target_rate / self._rate_estimate
        scale = np.clip(scale, 0.9, 1.1)
        for proj in getattr(population, "_projections", []):
            if hasattr(proj, "data"):
                proj.data *= scale
        self._last_scale = float(scale)

update(population)

Scale weights of all incoming projections to population.

Parameters

population : Population Target population whose rate should be regulated.

Source code in src/sc_neurocore/learning/advanced.py
Python
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def update(self, population: Any) -> None:
    """Scale weights of all incoming projections to *population*.

    Parameters
    ----------
    population : Population
        Target population whose rate should be regulated.
    """
    current_rate = np.mean(population.voltages > 0.9) * 1000.0
    if self._rate_estimate is None:
        self._rate_estimate = current_rate
    alpha = 1.0 / self.tau
    self._rate_estimate += alpha * (current_rate - self._rate_estimate)
    if self._rate_estimate <= 0:
        return
    scale = self.target_rate / self._rate_estimate
    scale = np.clip(scale, 0.9, 1.1)
    for proj in getattr(population, "_projections", []):
        if hasattr(proj, "data"):
            proj.data *= scale
    self._last_scale = float(scale)

Short-Term Plasticity (Tsodyks-Markram 1997)

sc_neurocore.learning.advanced.ShortTermPlasticity

Tsodyks-Markram short-term plasticity (STP).

Tsodyks & Markram 1997. Models depression (tau_d) and facilitation (tau_f) with use parameter u_se.

Source code in src/sc_neurocore/learning/advanced.py
Python
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
class ShortTermPlasticity:
    """Tsodyks-Markram short-term plasticity (STP).

    Tsodyks & Markram 1997. Models depression (tau_d) and facilitation (tau_f)
    with use parameter u_se.
    """

    def __init__(self, tau_d: float = 200.0, tau_f: float = 600.0, u_se: float = 0.2) -> None:
        self.tau_d = tau_d
        self.tau_f = tau_f
        self.u_se = u_se
        self._x: np.ndarray[Any, Any] | None = None
        self._u: np.ndarray[Any, Any] | None = None

    def update(self, pre_spikes: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
        """Compute effective weight scaling given pre-synaptic spikes.

        Parameters
        ----------
        pre_spikes : np.ndarray[Any, Any]
            Binary (0/1) vector of length n_pre.

        Returns
        -------
        np.ndarray[Any, Any]
            Effective weight multiplier per pre-synaptic neuron.
        """
        n = pre_spikes.shape[0]
        if self._x is None:
            self._x = np.ones(n)
            self._u = np.full(n, self.u_se)
        assert self._x is not None and self._u is not None

        dt = 1.0
        self._x += dt / self.tau_d * (1.0 - self._x)
        self._u += dt / self.tau_f * (self.u_se - self._u)

        mask = pre_spikes.astype(bool)
        self._u[mask] += self.u_se * (1.0 - self._u[mask])
        release: np.ndarray[Any, Any] = self._u * self._x
        self._x[mask] -= release[mask]

        return release

update(pre_spikes)

Compute effective weight scaling given pre-synaptic spikes.

Parameters

pre_spikes : np.ndarray[Any, Any] Binary (0/1) vector of length n_pre.

Returns

np.ndarray[Any, Any] Effective weight multiplier per pre-synaptic neuron.

Source code in src/sc_neurocore/learning/advanced.py
Python
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def update(self, pre_spikes: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
    """Compute effective weight scaling given pre-synaptic spikes.

    Parameters
    ----------
    pre_spikes : np.ndarray[Any, Any]
        Binary (0/1) vector of length n_pre.

    Returns
    -------
    np.ndarray[Any, Any]
        Effective weight multiplier per pre-synaptic neuron.
    """
    n = pre_spikes.shape[0]
    if self._x is None:
        self._x = np.ones(n)
        self._u = np.full(n, self.u_se)
    assert self._x is not None and self._u is not None

    dt = 1.0
    self._x += dt / self.tau_d * (1.0 - self._x)
    self._u += dt / self.tau_f * (self.u_se - self._u)

    mask = pre_spikes.astype(bool)
    self._u[mask] += self.u_se * (1.0 - self._u[mask])
    release: np.ndarray[Any, Any] = self._u * self._x
    self._x[mask] -= release[mask]

    return release

Structural Plasticity

sc_neurocore.learning.advanced.StructuralPlasticity

Activity-dependent synapse creation and elimination.

Grows new synapses between correlated neurons and prunes weak ones.

Source code in src/sc_neurocore/learning/advanced.py
Python
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
class StructuralPlasticity:
    """Activity-dependent synapse creation and elimination.

    Grows new synapses between correlated neurons and prunes weak ones.
    """

    def __init__(self, growth_rate: float = 0.001, prune_threshold: float = 0.01) -> None:
        self.growth_rate = growth_rate
        self.prune_threshold = prune_threshold

    def update(self, projection: Any) -> None:
        """Grow or prune synapses in a Projection based on activity.

        Parameters
        ----------
        projection : Projection
            Target projection to modify.
        """
        prune_mask = np.abs(projection.data) < self.prune_threshold
        projection.data[prune_mask] = 0.0

        n_src = projection.source.n
        n_pruned = int(prune_mask.sum())
        n_grow = min(n_pruned, max(1, int(self.growth_rate * len(projection.data))))
        if n_grow > 0:
            zero_indices = np.where(projection.data == 0.0)[0]
            if zero_indices.size > 0:
                chosen = np.random.choice(
                    zero_indices, size=min(n_grow, zero_indices.size), replace=False
                )
                projection.data[chosen] = np.random.uniform(0.001, 0.05, size=chosen.size)

update(projection)

Grow or prune synapses in a Projection based on activity.

Parameters

projection : Projection Target projection to modify.

Source code in src/sc_neurocore/learning/advanced.py
Python
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def update(self, projection: Any) -> None:
    """Grow or prune synapses in a Projection based on activity.

    Parameters
    ----------
    projection : Projection
        Target projection to modify.
    """
    prune_mask = np.abs(projection.data) < self.prune_threshold
    projection.data[prune_mask] = 0.0

    n_src = projection.source.n
    n_pruned = int(prune_mask.sum())
    n_grow = min(n_pruned, max(1, int(self.growth_rate * len(projection.data))))
    if n_grow > 0:
        zero_indices = np.where(projection.data == 0.0)[0]
        if zero_indices.size > 0:
            chosen = np.random.choice(
                zero_indices, size=min(n_grow, zero_indices.size), replace=False
            )
            projection.data[chosen] = np.random.uniform(0.001, 0.05, size=chosen.size)

Federated

sc_neurocore.learning.federated

Privacy-preserving federated aggregation over stochastic-computing bitstreams.

FederatedAggregator

Privacy-preserving federated learning using SC bitstreams.

Source code in src/sc_neurocore/learning/federated.py
Python
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class FederatedAggregator:
    """Privacy-preserving federated learning using SC bitstreams."""

    @staticmethod
    def aggregate_gradients(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
        """Aggregate gradient bitstreams from multiple clients by majority vote.

        Parameters
        ----------
        client_gradients : list of numpy.ndarray
            Per-client bitstream arrays; all must share the same shape.

        Returns
        -------
        numpy.ndarray
            The majority-voted aggregated bitstream.
        """
        if not client_gradients:
            raise ValueError("No gradients to aggregate")

        # Stack: (Num_Clients, Gradient_Size)
        stack = np.stack(client_gradients, axis=0)

        # Sum bits at each position across clients
        # (Client1_bit_i + Client2_bit_i + ... )
        sums = np.sum(stack, axis=0)

        # Majority Vote
        # If sum > num_clients / 2, output 1
        threshold = len(client_gradients) / 2.0

        aggregated: np.ndarray[Any, Any] = (sums > threshold).astype(np.uint8)

        return aggregated

    @staticmethod
    def secure_sum_protocol(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
        """Sum client bitstreams as a secure-aggregation surrogate.

        Models a secure aggregation where the server observes only the
        element-wise sum, not individual client updates, analogous to secure
        multi-party computation.
        """
        # In SC, 'Summing' bitstreams usually produces an integer result (0..N).
        # This is strictly not a bitstream anymore but a discretized value.
        stack = np.stack(client_gradients, axis=0)
        summed: np.ndarray[Any, Any] = np.sum(stack, axis=0)
        return summed

aggregate_gradients(client_gradients) staticmethod

Aggregate gradient bitstreams from multiple clients by majority vote.

Parameters

client_gradients : list of numpy.ndarray Per-client bitstream arrays; all must share the same shape.

Returns

numpy.ndarray The majority-voted aggregated bitstream.

Source code in src/sc_neurocore/learning/federated.py
Python
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@staticmethod
def aggregate_gradients(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
    """Aggregate gradient bitstreams from multiple clients by majority vote.

    Parameters
    ----------
    client_gradients : list of numpy.ndarray
        Per-client bitstream arrays; all must share the same shape.

    Returns
    -------
    numpy.ndarray
        The majority-voted aggregated bitstream.
    """
    if not client_gradients:
        raise ValueError("No gradients to aggregate")

    # Stack: (Num_Clients, Gradient_Size)
    stack = np.stack(client_gradients, axis=0)

    # Sum bits at each position across clients
    # (Client1_bit_i + Client2_bit_i + ... )
    sums = np.sum(stack, axis=0)

    # Majority Vote
    # If sum > num_clients / 2, output 1
    threshold = len(client_gradients) / 2.0

    aggregated: np.ndarray[Any, Any] = (sums > threshold).astype(np.uint8)

    return aggregated

secure_sum_protocol(client_gradients) staticmethod

Sum client bitstreams as a secure-aggregation surrogate.

Models a secure aggregation where the server observes only the element-wise sum, not individual client updates, analogous to secure multi-party computation.

Source code in src/sc_neurocore/learning/federated.py
Python
53
54
55
56
57
58
59
60
61
62
63
64
65
@staticmethod
def secure_sum_protocol(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
    """Sum client bitstreams as a secure-aggregation surrogate.

    Models a secure aggregation where the server observes only the
    element-wise sum, not individual client updates, analogous to secure
    multi-party computation.
    """
    # In SC, 'Summing' bitstreams usually produces an integer result (0..N).
    # This is strictly not a bitstream anymore but a discretized value.
    stack = np.stack(client_gradients, axis=0)
    summed: np.ndarray[Any, Any] = np.sum(stack, axis=0)
    return summed

Lifelong (EWC)

Elastic Weight Consolidation with active penalty: pushes drifted weights back toward consolidated values, weighted by Fisher information.

sc_neurocore.learning.lifelong

EWC_SCLayer dataclass

Bases: SCLearningLayer

Lifelong Learning Layer using Elastic Weight Consolidation (Approx).

Source code in src/sc_neurocore/learning/lifelong.py
Python
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@dataclass
class EWC_SCLayer(SCLearningLayer):
    """
    Lifelong Learning Layer using Elastic Weight Consolidation (Approx).
    """

    ewc_lambda: float = 10.0  # Strength of constraint

    def __post_init__(self) -> None:
        super().__post_init__()
        self.fisher_info = np.zeros((self.n_neurons, self.n_inputs))
        self.star_weights = np.zeros((self.n_neurons, self.n_inputs))

    def consolidate_task(self) -> None:
        """
        Call after finishing a task.
        Calculate Fisher Info (Importance) and freeze 'star' weights.
        """
        # In SC, Fisher Info approx ~ Activity * Plasticity
        # Weights that changed a lot or are high are often important.
        # Simplified: Importance = Current Weight Magnitude (Hebbian)

        current_w = self.get_weights()
        self.star_weights = current_w.copy()
        # Assume all non-zero weights are somewhat important
        self.fisher_info = current_w.copy()

    def apply_ewc_penalty(self, step_size: float = 0.01) -> float:
        """Push weights back toward consolidated values, weighted by Fisher info.

        Kirkpatrick et al. 2017, adapted to SC/STDP setting.
        Penalty gradient per synapse: F_i * (w_i - w_star_i).

        Parameters
        ----------
        step_size : float
            Fraction of penalty gradient to apply per call.

        Returns
        -------
        float
            Total penalty magnitude (for logging).
        """
        current_w = self.get_weights()
        delta = current_w - self.star_weights
        penalty_grad = self.fisher_info * delta
        correction = self.ewc_lambda * step_size * penalty_grad
        new_w = np.clip(current_w - correction, self.w_min, self.w_max)

        for i in range(self.n_neurons):
            for j in range(self.n_inputs):
                self.synapses[i][j].w = float(new_w[i, j])

        return float(np.sum(np.abs(penalty_grad)))

consolidate_task()

Call after finishing a task. Calculate Fisher Info (Importance) and freeze 'star' weights.

Source code in src/sc_neurocore/learning/lifelong.py
Python
27
28
29
30
31
32
33
34
35
36
37
38
39
def consolidate_task(self) -> None:
    """
    Call after finishing a task.
    Calculate Fisher Info (Importance) and freeze 'star' weights.
    """
    # In SC, Fisher Info approx ~ Activity * Plasticity
    # Weights that changed a lot or are high are often important.
    # Simplified: Importance = Current Weight Magnitude (Hebbian)

    current_w = self.get_weights()
    self.star_weights = current_w.copy()
    # Assume all non-zero weights are somewhat important
    self.fisher_info = current_w.copy()

apply_ewc_penalty(step_size=0.01)

Push weights back toward consolidated values, weighted by Fisher info.

Kirkpatrick et al. 2017, adapted to SC/STDP setting. Penalty gradient per synapse: F_i * (w_i - w_star_i).

Parameters

step_size : float Fraction of penalty gradient to apply per call.

Returns

float Total penalty magnitude (for logging).

Source code in src/sc_neurocore/learning/lifelong.py
Python
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def apply_ewc_penalty(self, step_size: float = 0.01) -> float:
    """Push weights back toward consolidated values, weighted by Fisher info.

    Kirkpatrick et al. 2017, adapted to SC/STDP setting.
    Penalty gradient per synapse: F_i * (w_i - w_star_i).

    Parameters
    ----------
    step_size : float
        Fraction of penalty gradient to apply per call.

    Returns
    -------
    float
        Total penalty magnitude (for logging).
    """
    current_w = self.get_weights()
    delta = current_w - self.star_weights
    penalty_grad = self.fisher_info * delta
    correction = self.ewc_lambda * step_size * penalty_grad
    new_w = np.clip(current_w - correction, self.w_min, self.w_max)

    for i in range(self.n_neurons):
        for j in range(self.n_inputs):
            self.synapses[i][j].w = float(new_w[i, j])

    return float(np.sum(np.abs(penalty_grad)))

Neuroevolution

sc_neurocore.learning.neuroevolution

Genetic algorithm for evolving spiking-neural-network weights and parameters.

SNNGeneticEvolver dataclass

Genetic algorithm for evolving SNN weights and parameters.

Source code in src/sc_neurocore/learning/neuroevolution.py
Python
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@dataclass
class SNNGeneticEvolver:
    """Genetic algorithm for evolving SNN weights and parameters."""

    population_size: int = 20
    mutation_rate: float = 0.05
    elite_fraction: float = 0.2

    def __init__(self, layer_factory: Callable[[], Any], fitness_func: Callable[[Any], float]):
        self.layer_factory = layer_factory
        self.fitness_func = fitness_func
        # Initialize population
        self.population = [layer_factory() for _ in range(self.population_size)]

    def evolve(self, generations: int) -> Any:
        """Run the GA for the given number of generations and return the best individual."""
        for gen in range(generations):
            # 1. Evaluate Fitness
            scores = [self.fitness_func(ind) for ind in self.population]

            # Sort by fitness (descending)
            ranked_indices = np.argsort(scores)[::-1]
            ranked_pop = [self.population[i] for i in ranked_indices]

            logger.info("Gen %d: Best Fitness = %.4f", gen, scores[ranked_indices[0]])

            # 2. Selection (Elitism)
            n_elite = int(self.population_size * self.elite_fraction)
            next_gen = ranked_pop[:n_elite]

            # 3. Crossover & Mutation
            while len(next_gen) < self.population_size:
                # Simple random selection for parents
                p1, p2 = np.random.choice(ranked_pop[: n_elite + 5], 2, replace=False)
                child = self._crossover(p1, p2)
                self._mutate(child)
                next_gen.append(child)

            self.population = next_gen

        return self.population[0]  # Return best

    def _crossover(self, p1: Any, p2: Any) -> Any:
        # Create new instance
        child = self.layer_factory()
        if not hasattr(p1, "weights"):
            return child

        # Uniform crossover
        mask = np.random.rand(*p1.weights.shape) > 0.5
        child.weights = np.where(mask, p1.weights, p2.weights)
        return child

    def _mutate(self, ind: Any) -> None:
        if not hasattr(ind, "weights"):
            return

        # Gaussian mutation
        mutation_mask = np.random.rand(*ind.weights.shape) < self.mutation_rate
        noise = np.random.normal(0, 0.1, ind.weights.shape)
        ind.weights[mutation_mask] += noise[mutation_mask]
        ind.weights = np.clip(ind.weights, 0, 1)

evolve(generations)

Run the GA for the given number of generations and return the best individual.

Source code in src/sc_neurocore/learning/neuroevolution.py
Python
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def evolve(self, generations: int) -> Any:
    """Run the GA for the given number of generations and return the best individual."""
    for gen in range(generations):
        # 1. Evaluate Fitness
        scores = [self.fitness_func(ind) for ind in self.population]

        # Sort by fitness (descending)
        ranked_indices = np.argsort(scores)[::-1]
        ranked_pop = [self.population[i] for i in ranked_indices]

        logger.info("Gen %d: Best Fitness = %.4f", gen, scores[ranked_indices[0]])

        # 2. Selection (Elitism)
        n_elite = int(self.population_size * self.elite_fraction)
        next_gen = ranked_pop[:n_elite]

        # 3. Crossover & Mutation
        while len(next_gen) < self.population_size:
            # Simple random selection for parents
            p1, p2 = np.random.choice(ranked_pop[: n_elite + 5], 2, replace=False)
            child = self._crossover(p1, p2)
            self._mutate(child)
            next_gen.append(child)

        self.population = next_gen

    return self.population[0]  # Return best