Learning
Training paradigms beyond single-node STDP: BPTT, truncated BPTT,
eligibility traces, reward-modulated learning, meta-learning,
homeostatic scaling, short-term plasticity, structural plasticity,
federated learning, lifelong/continual learning, and neuroevolution.
BPTT Learner
sc_neurocore.learning.advanced.BPTTLearner
Backpropagation Through Time for spiking networks.
Uses fast-sigmoid surrogate gradient (Neftci et al. 2019) to handle
the spike non-differentiability.
Source code in src/sc_neurocore/learning/advanced.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | class BPTTLearner:
"""Backpropagation Through Time for spiking networks.
Uses fast-sigmoid surrogate gradient (Neftci et al. 2019) to handle
the spike non-differentiability.
"""
def __init__(self, network, loss_fn, lr=1e-3):
self.network = network
self.loss_fn = loss_fn
self.lr = lr
def train_step(self, inputs, targets):
"""One BPTT step: forward pass, loss, backward with surrogate gradients.
Parameters
----------
inputs : np.ndarray
Shape (n_steps, n_input) input currents.
targets : np.ndarray
Shape (n_steps, n_output) target spike trains.
Returns
-------
float
Scalar loss value.
"""
n_steps = inputs.shape[0]
for pop in self.network.populations:
pop.reset_all()
recorded_v = []
recorded_spikes = []
for t in range(n_steps):
currents = inputs[t]
pop = self.network.populations[0]
spikes = pop.step_all(currents[: pop.n])
recorded_v.append(pop.voltages.copy())
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
loss = float(self.loss_fn(spike_arr, targets))
output_error = spike_arr - targets
for proj in self.network.projections:
n_src = proj.source.n
grad_w = np.zeros_like(proj.data)
for t in range(n_steps):
surr = _fast_sigmoid_surrogate(recorded_v[t])
post_delta = output_error[t][: proj.target.n] * surr[: proj.target.n]
for i in range(n_src):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
grad_w[k] += recorded_spikes[t][i] * post_delta[j]
proj.data -= self.lr * grad_w / max(n_steps, 1)
return loss
|
train_step(inputs, targets)
One BPTT step: forward pass, loss, backward with surrogate gradients.
Parameters
inputs : np.ndarray
Shape (n_steps, n_input) input currents.
targets : np.ndarray
Shape (n_steps, n_output) target spike trains.
Returns
float
Scalar loss value.
Source code in src/sc_neurocore/learning/advanced.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | def train_step(self, inputs, targets):
"""One BPTT step: forward pass, loss, backward with surrogate gradients.
Parameters
----------
inputs : np.ndarray
Shape (n_steps, n_input) input currents.
targets : np.ndarray
Shape (n_steps, n_output) target spike trains.
Returns
-------
float
Scalar loss value.
"""
n_steps = inputs.shape[0]
for pop in self.network.populations:
pop.reset_all()
recorded_v = []
recorded_spikes = []
for t in range(n_steps):
currents = inputs[t]
pop = self.network.populations[0]
spikes = pop.step_all(currents[: pop.n])
recorded_v.append(pop.voltages.copy())
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
loss = float(self.loss_fn(spike_arr, targets))
output_error = spike_arr - targets
for proj in self.network.projections:
n_src = proj.source.n
grad_w = np.zeros_like(proj.data)
for t in range(n_steps):
surr = _fast_sigmoid_surrogate(recorded_v[t])
post_delta = output_error[t][: proj.target.n] * surr[: proj.target.n]
for i in range(n_src):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
grad_w[k] += recorded_spikes[t][i] * post_delta[j]
proj.data -= self.lr * grad_w / max(n_steps, 1)
return loss
|
Truncated BPTT (Williams & Peng 1990)
Chunks long sequences into windows of k timesteps, backpropagating
gradients within each chunk while carrying membrane state forward.
Memory O(k) instead of O(T).
sc_neurocore.learning.advanced.TBPTTLearner
Truncated Backpropagation Through Time for long sequences.
Splits input into chunks of k timesteps, backpropagating gradients
only within each chunk while carrying forward state (membrane voltage)
across boundaries. Reduces memory from O(T) to O(k).
Williams & Peng 1990.
Source code in src/sc_neurocore/learning/advanced.py
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 | class TBPTTLearner:
"""Truncated Backpropagation Through Time for long sequences.
Splits input into chunks of ``k`` timesteps, backpropagating gradients
only within each chunk while carrying forward state (membrane voltage)
across boundaries. Reduces memory from O(T) to O(k).
Williams & Peng 1990.
"""
def __init__(self, network, loss_fn, lr=1e-3, k: int = 50):
self.network = network
self.loss_fn = loss_fn
self.lr = lr
self.k = k
def train_step(self, inputs, targets):
"""One TBPTT step over the full sequence, chunked into windows of k.
Parameters
----------
inputs : np.ndarray
Shape (n_steps, n_input).
targets : np.ndarray
Shape (n_steps, n_output).
Returns
-------
float
Total loss summed across chunks.
"""
n_steps = inputs.shape[0]
total_loss = 0.0
for pop in self.network.populations:
pop.reset_all()
for chunk_start in range(0, n_steps, self.k):
chunk_end = min(chunk_start + self.k, n_steps)
chunk_len = chunk_end - chunk_start
recorded_v = []
recorded_spikes = []
for t in range(chunk_start, chunk_end):
pop = self.network.populations[0]
spikes = pop.step_all(inputs[t][: pop.n])
recorded_v.append(pop.voltages.copy())
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
chunk_targets = targets[chunk_start:chunk_end]
chunk_loss = float(self.loss_fn(spike_arr, chunk_targets))
total_loss += chunk_loss
# Backward within this chunk only
output_error = spike_arr - chunk_targets
for proj in self.network.projections:
n_src = proj.source.n
grad_w = np.zeros_like(proj.data)
for t_local in range(chunk_len):
surr = _fast_sigmoid_surrogate(recorded_v[t_local])
post_delta = output_error[t_local][: proj.target.n] * surr[: proj.target.n]
for i in range(n_src):
for k_idx in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k_idx]
grad_w[k_idx] += recorded_spikes[t_local][i] * post_delta[j]
proj.data -= self.lr * grad_w / max(chunk_len, 1)
# State (voltages) carries forward — no reset between chunks
return total_loss
|
train_step(inputs, targets)
One TBPTT step over the full sequence, chunked into windows of k.
Parameters
inputs : np.ndarray
Shape (n_steps, n_input).
targets : np.ndarray
Shape (n_steps, n_output).
Returns
float
Total loss summed across chunks.
Source code in src/sc_neurocore/learning/advanced.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 | def train_step(self, inputs, targets):
"""One TBPTT step over the full sequence, chunked into windows of k.
Parameters
----------
inputs : np.ndarray
Shape (n_steps, n_input).
targets : np.ndarray
Shape (n_steps, n_output).
Returns
-------
float
Total loss summed across chunks.
"""
n_steps = inputs.shape[0]
total_loss = 0.0
for pop in self.network.populations:
pop.reset_all()
for chunk_start in range(0, n_steps, self.k):
chunk_end = min(chunk_start + self.k, n_steps)
chunk_len = chunk_end - chunk_start
recorded_v = []
recorded_spikes = []
for t in range(chunk_start, chunk_end):
pop = self.network.populations[0]
spikes = pop.step_all(inputs[t][: pop.n])
recorded_v.append(pop.voltages.copy())
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
chunk_targets = targets[chunk_start:chunk_end]
chunk_loss = float(self.loss_fn(spike_arr, chunk_targets))
total_loss += chunk_loss
# Backward within this chunk only
output_error = spike_arr - chunk_targets
for proj in self.network.projections:
n_src = proj.source.n
grad_w = np.zeros_like(proj.data)
for t_local in range(chunk_len):
surr = _fast_sigmoid_surrogate(recorded_v[t_local])
post_delta = output_error[t_local][: proj.target.n] * surr[: proj.target.n]
for i in range(n_src):
for k_idx in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k_idx]
grad_w[k_idx] += recorded_spikes[t_local][i] * post_delta[j]
proj.data -= self.lr * grad_w / max(chunk_len, 1)
# State (voltages) carries forward — no reset between chunks
return total_loss
|
Eligibility Traces (e-prop, Bellec et al. 2020)
sc_neurocore.learning.advanced.EligibilityTrace
E-prop eligibility trace: three-factor learning (pre x post x error).
Bellec et al. 2020.
Source code in src/sc_neurocore/learning/advanced.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194 | class EligibilityTrace:
"""E-prop eligibility trace: three-factor learning (pre x post x error).
Bellec et al. 2020.
"""
def __init__(self, tau_e=20.0, dt=1.0):
self.decay = np.exp(-dt / tau_e)
self._trace = None
def update(self, pre_spike, post_spike, error_signal):
"""Compute weight delta from three-factor rule.
Parameters
----------
pre_spike, post_spike : np.ndarray
Binary (0/1) vectors of length n_pre, n_post.
error_signal : np.ndarray
Error signal of length n_post.
Returns
-------
np.ndarray
Weight delta matrix of shape (n_pre, n_post).
"""
outer = np.outer(pre_spike, post_spike)
if self._trace is None:
self._trace = np.zeros_like(outer)
self._trace = self.decay * self._trace + outer
return self._trace * error_signal[np.newaxis, :]
|
update(pre_spike, post_spike, error_signal)
Compute weight delta from three-factor rule.
Parameters
pre_spike, post_spike : np.ndarray
Binary (0/1) vectors of length n_pre, n_post.
error_signal : np.ndarray
Error signal of length n_post.
Returns
np.ndarray
Weight delta matrix of shape (n_pre, n_post).
Source code in src/sc_neurocore/learning/advanced.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194 | def update(self, pre_spike, post_spike, error_signal):
"""Compute weight delta from three-factor rule.
Parameters
----------
pre_spike, post_spike : np.ndarray
Binary (0/1) vectors of length n_pre, n_post.
error_signal : np.ndarray
Error signal of length n_post.
Returns
-------
np.ndarray
Weight delta matrix of shape (n_pre, n_post).
"""
outer = np.outer(pre_spike, post_spike)
if self._trace is None:
self._trace = np.zeros_like(outer)
self._trace = self.decay * self._trace + outer
return self._trace * error_signal[np.newaxis, :]
|
Reward-Modulated STDP
sc_neurocore.learning.advanced.RewardModulatedLearner
Reward-modulated STDP (R-STDP).
Maintains per-synapse eligibility traces and applies weight updates
scaled by a global reward signal.
Source code in src/sc_neurocore/learning/advanced.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244 | class RewardModulatedLearner:
"""Reward-modulated STDP (R-STDP).
Maintains per-synapse eligibility traces and applies weight updates
scaled by a global reward signal.
"""
def __init__(self, network, tau_reward=100.0):
self.network = network
self.reward_decay = np.exp(-1.0 / tau_reward)
self._elig: dict[int, np.ndarray] = {}
self._pre_trace: dict[int, np.ndarray] = {}
self._post_trace: dict[int, np.ndarray] = {}
self._init_traces()
def _init_traces(self):
for proj in self.network.projections:
pid = id(proj)
self._elig[pid] = np.zeros_like(proj.data)
self._pre_trace[pid] = np.zeros(proj.source.n)
self._post_trace[pid] = np.zeros(proj.target.n)
def step(self, reward):
"""Apply reward-modulated weight update.
Parameters
----------
reward : float
Scalar reward signal.
"""
tau_trace = 20.0
trace_decay = np.exp(-1.0 / tau_trace)
for proj in self.network.projections:
pid = id(proj)
pre_sp = proj.source.voltages > 0.9
post_sp = proj.target.voltages > 0.9
self._pre_trace[pid] = trace_decay * self._pre_trace[pid] + pre_sp
self._post_trace[pid] = trace_decay * self._post_trace[pid] + post_sp
for i in range(proj.source.n):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
self._elig[pid][k] = (
self.reward_decay * self._elig[pid][k]
+ self._pre_trace[pid][i] * self._post_trace[pid][j]
)
proj.data += 0.01 * reward * self._elig[pid]
np.clip(proj.data, 0.0, None, out=proj.data)
|
step(reward)
Apply reward-modulated weight update.
Parameters
reward : float
Scalar reward signal.
Source code in src/sc_neurocore/learning/advanced.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244 | def step(self, reward):
"""Apply reward-modulated weight update.
Parameters
----------
reward : float
Scalar reward signal.
"""
tau_trace = 20.0
trace_decay = np.exp(-1.0 / tau_trace)
for proj in self.network.projections:
pid = id(proj)
pre_sp = proj.source.voltages > 0.9
post_sp = proj.target.voltages > 0.9
self._pre_trace[pid] = trace_decay * self._pre_trace[pid] + pre_sp
self._post_trace[pid] = trace_decay * self._post_trace[pid] + post_sp
for i in range(proj.source.n):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
self._elig[pid][k] = (
self.reward_decay * self._elig[pid][k]
+ self._pre_trace[pid][i] * self._post_trace[pid][j]
)
proj.data += 0.01 * reward * self._elig[pid]
np.clip(proj.data, 0.0, None, out=proj.data)
|
MAML-style meta-learning for spiking networks.
Finn et al. 2017. Inner loop: fast adaptation on a task.
Outer loop: meta-gradient across tasks.
Source code in src/sc_neurocore/learning/advanced.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317 | class MetaLearner:
"""MAML-style meta-learning for spiking networks.
Finn et al. 2017. Inner loop: fast adaptation on a task.
Outer loop: meta-gradient across tasks.
"""
def __init__(self, network, inner_lr=0.01, outer_lr=0.001):
self.network = network
self.inner_lr = inner_lr
self.outer_lr = outer_lr
def _snapshot_weights(self):
return [proj.data.copy() for proj in self.network.projections]
def _restore_weights(self, snapshot):
for proj, w in zip(self.network.projections, snapshot):
proj.data[:] = w
def inner_loop(self, task_data, n_steps=5):
"""Fast adaptation: n_steps of gradient descent on task_data.
Parameters
----------
task_data : tuple
(inputs, targets) arrays.
n_steps : int
Number of inner-loop updates.
"""
inputs, targets = task_data
for _ in range(n_steps):
for pop in self.network.populations:
pop.reset_all()
n_t = inputs.shape[0]
recorded_spikes = []
for t in range(n_t):
pop = self.network.populations[0]
spikes = pop.step_all(inputs[t][: pop.n])
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
error = spike_arr - targets
for proj in self.network.projections:
grad = np.zeros_like(proj.data)
for t in range(n_t):
for i in range(proj.source.n):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
grad[k] += recorded_spikes[t][i] * error[t][j]
proj.data -= self.inner_lr * grad / max(n_t, 1)
def outer_step(self, tasks):
"""Meta-gradient update across multiple tasks.
Parameters
----------
tasks : list of tuple
Each element is (inputs, targets).
"""
meta_grad = [np.zeros_like(proj.data) for proj in self.network.projections]
base_weights = self._snapshot_weights()
for task in tasks:
self._restore_weights(base_weights)
pre_weights = self._snapshot_weights()
self.inner_loop(task)
for idx, proj in enumerate(self.network.projections):
meta_grad[idx] += proj.data - pre_weights[idx]
self._restore_weights(base_weights)
for idx, proj in enumerate(self.network.projections):
proj.data += self.outer_lr * meta_grad[idx] / max(len(tasks), 1)
|
Fast adaptation: n_steps of gradient descent on task_data.
task_data : tuple
(inputs, targets) arrays.
n_steps : int
Number of inner-loop updates.
Source code in src/sc_neurocore/learning/advanced.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295 | def inner_loop(self, task_data, n_steps=5):
"""Fast adaptation: n_steps of gradient descent on task_data.
Parameters
----------
task_data : tuple
(inputs, targets) arrays.
n_steps : int
Number of inner-loop updates.
"""
inputs, targets = task_data
for _ in range(n_steps):
for pop in self.network.populations:
pop.reset_all()
n_t = inputs.shape[0]
recorded_spikes = []
for t in range(n_t):
pop = self.network.populations[0]
spikes = pop.step_all(inputs[t][: pop.n])
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
error = spike_arr - targets
for proj in self.network.projections:
grad = np.zeros_like(proj.data)
for t in range(n_t):
for i in range(proj.source.n):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
grad[k] += recorded_spikes[t][i] * error[t][j]
proj.data -= self.inner_lr * grad / max(n_t, 1)
|
Meta-gradient update across multiple tasks.
tasks : list of tuple
Each element is (inputs, targets).
Source code in src/sc_neurocore/learning/advanced.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317 | def outer_step(self, tasks):
"""Meta-gradient update across multiple tasks.
Parameters
----------
tasks : list of tuple
Each element is (inputs, targets).
"""
meta_grad = [np.zeros_like(proj.data) for proj in self.network.projections]
base_weights = self._snapshot_weights()
for task in tasks:
self._restore_weights(base_weights)
pre_weights = self._snapshot_weights()
self.inner_loop(task)
for idx, proj in enumerate(self.network.projections):
meta_grad[idx] += proj.data - pre_weights[idx]
self._restore_weights(base_weights)
for idx, proj in enumerate(self.network.projections):
proj.data += self.outer_lr * meta_grad[idx] / max(len(tasks), 1)
|
Homeostatic Plasticity (Turrigiano 2008)
sc_neurocore.learning.advanced.HomeostaticPlasticity
Homeostatic synaptic scaling to maintain target firing rate.
Turrigiano 2008. Multiplicatively scales all incoming weights to keep
the population mean rate near target_rate.
Source code in src/sc_neurocore/learning/advanced.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352 | class HomeostaticPlasticity:
"""Homeostatic synaptic scaling to maintain target firing rate.
Turrigiano 2008. Multiplicatively scales all incoming weights to keep
the population mean rate near target_rate.
"""
def __init__(self, target_rate=10.0, tau=1000.0):
self.target_rate = target_rate
self.tau = tau
self._rate_estimate = None
def update(self, population):
"""Scale weights of all incoming projections to *population*.
Parameters
----------
population : Population
Target population whose rate should be regulated.
"""
current_rate = np.mean(population.voltages > 0.9) * 1000.0
if self._rate_estimate is None:
self._rate_estimate = current_rate
alpha = 1.0 / self.tau
self._rate_estimate += alpha * (current_rate - self._rate_estimate)
if self._rate_estimate <= 0:
return
scale = self.target_rate / self._rate_estimate
scale = np.clip(scale, 0.9, 1.1)
for proj in getattr(population, "_projections", []):
if hasattr(proj, "data"):
proj.data *= scale
self._last_scale = float(scale)
|
update(population)
Scale weights of all incoming projections to population.
Parameters
population : Population
Target population whose rate should be regulated.
Source code in src/sc_neurocore/learning/advanced.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352 | def update(self, population):
"""Scale weights of all incoming projections to *population*.
Parameters
----------
population : Population
Target population whose rate should be regulated.
"""
current_rate = np.mean(population.voltages > 0.9) * 1000.0
if self._rate_estimate is None:
self._rate_estimate = current_rate
alpha = 1.0 / self.tau
self._rate_estimate += alpha * (current_rate - self._rate_estimate)
if self._rate_estimate <= 0:
return
scale = self.target_rate / self._rate_estimate
scale = np.clip(scale, 0.9, 1.1)
for proj in getattr(population, "_projections", []):
if hasattr(proj, "data"):
proj.data *= scale
self._last_scale = float(scale)
|
Short-Term Plasticity (Tsodyks-Markram 1997)
sc_neurocore.learning.advanced.ShortTermPlasticity
Tsodyks-Markram short-term plasticity (STP).
Tsodyks & Markram 1997. Models depression (tau_d) and facilitation (tau_f)
with use parameter u_se.
Source code in src/sc_neurocore/learning/advanced.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396 | class ShortTermPlasticity:
"""Tsodyks-Markram short-term plasticity (STP).
Tsodyks & Markram 1997. Models depression (tau_d) and facilitation (tau_f)
with use parameter u_se.
"""
def __init__(self, tau_d=200.0, tau_f=600.0, u_se=0.2):
self.tau_d = tau_d
self.tau_f = tau_f
self.u_se = u_se
self._x = None # available resources
self._u = None # utilisation variable
def update(self, pre_spikes):
"""Compute effective weight scaling given pre-synaptic spikes.
Parameters
----------
pre_spikes : np.ndarray
Binary (0/1) vector of length n_pre.
Returns
-------
np.ndarray
Effective weight multiplier per pre-synaptic neuron.
"""
n = pre_spikes.shape[0]
if self._x is None:
self._x = np.ones(n)
self._u = np.full(n, self.u_se)
dt = 1.0
self._x += dt / self.tau_d * (1.0 - self._x)
self._u += dt / self.tau_f * (self.u_se - self._u)
mask = pre_spikes.astype(bool)
self._u[mask] += self.u_se * (1.0 - self._u[mask])
release = self._u * self._x
self._x[mask] -= release[mask]
return release
|
update(pre_spikes)
Compute effective weight scaling given pre-synaptic spikes.
Parameters
pre_spikes : np.ndarray
Binary (0/1) vector of length n_pre.
Returns
np.ndarray
Effective weight multiplier per pre-synaptic neuron.
Source code in src/sc_neurocore/learning/advanced.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396 | def update(self, pre_spikes):
"""Compute effective weight scaling given pre-synaptic spikes.
Parameters
----------
pre_spikes : np.ndarray
Binary (0/1) vector of length n_pre.
Returns
-------
np.ndarray
Effective weight multiplier per pre-synaptic neuron.
"""
n = pre_spikes.shape[0]
if self._x is None:
self._x = np.ones(n)
self._u = np.full(n, self.u_se)
dt = 1.0
self._x += dt / self.tau_d * (1.0 - self._x)
self._u += dt / self.tau_f * (self.u_se - self._u)
mask = pre_spikes.astype(bool)
self._u[mask] += self.u_se * (1.0 - self._u[mask])
release = self._u * self._x
self._x[mask] -= release[mask]
return release
|
Structural Plasticity
sc_neurocore.learning.advanced.StructuralPlasticity
Activity-dependent synapse creation and elimination.
Grows new synapses between correlated neurons and prunes weak ones.
Source code in src/sc_neurocore/learning/advanced.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429 | class StructuralPlasticity:
"""Activity-dependent synapse creation and elimination.
Grows new synapses between correlated neurons and prunes weak ones.
"""
def __init__(self, growth_rate=0.001, prune_threshold=0.01):
self.growth_rate = growth_rate
self.prune_threshold = prune_threshold
def update(self, projection):
"""Grow or prune synapses in a Projection based on activity.
Parameters
----------
projection : Projection
Target projection to modify.
"""
prune_mask = np.abs(projection.data) < self.prune_threshold
projection.data[prune_mask] = 0.0
n_src = projection.source.n
n_pruned = int(prune_mask.sum())
n_grow = min(n_pruned, max(1, int(self.growth_rate * len(projection.data))))
if n_grow > 0:
zero_indices = np.where(projection.data == 0.0)[0]
if zero_indices.size > 0:
chosen = np.random.choice(
zero_indices, size=min(n_grow, zero_indices.size), replace=False
)
projection.data[chosen] = np.random.uniform(0.001, 0.05, size=chosen.size)
|
update(projection)
Grow or prune synapses in a Projection based on activity.
Parameters
projection : Projection
Target projection to modify.
Source code in src/sc_neurocore/learning/advanced.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429 | def update(self, projection):
"""Grow or prune synapses in a Projection based on activity.
Parameters
----------
projection : Projection
Target projection to modify.
"""
prune_mask = np.abs(projection.data) < self.prune_threshold
projection.data[prune_mask] = 0.0
n_src = projection.source.n
n_pruned = int(prune_mask.sum())
n_grow = min(n_pruned, max(1, int(self.growth_rate * len(projection.data))))
if n_grow > 0:
zero_indices = np.where(projection.data == 0.0)[0]
if zero_indices.size > 0:
chosen = np.random.choice(
zero_indices, size=min(n_grow, zero_indices.size), replace=False
)
projection.data[chosen] = np.random.uniform(0.001, 0.05, size=chosen.size)
|
Federated
sc_neurocore.learning.federated
FederatedAggregator
Privacy-Preserving Federated Learning using SC Bitstreams.
Source code in src/sc_neurocore/learning/federated.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 | class FederatedAggregator:
"""
Privacy-Preserving Federated Learning using SC Bitstreams.
"""
@staticmethod
def aggregate_gradients(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
"""
Aggregates gradient bitstreams from multiple clients.
Args:
client_gradients: List of numpy arrays (bitstreams).
All must have same shape.
Returns:
Aggregated bitstream (Majority Vote).
"""
if not client_gradients:
raise ValueError("No gradients to aggregate")
# Stack: (Num_Clients, Gradient_Size)
stack = np.stack(client_gradients, axis=0)
# Sum bits at each position across clients
# (Client1_bit_i + Client2_bit_i + ... )
sums = np.sum(stack, axis=0)
# Majority Vote
# If sum > num_clients / 2, output 1
threshold = len(client_gradients) / 2.0
aggregated = (sums > threshold).astype(np.uint8)
return aggregated
@staticmethod
def secure_sum_protocol(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
"""
Simulates a secure aggregation where the server only sees the sum,
not individual updates (like Secure Multi-Party Computation).
"""
# In SC, 'Summing' bitstreams usually produces an integer result (0..N).
# This is strictly not a bitstream anymore but a discretized value.
stack = np.stack(client_gradients, axis=0)
return np.sum(stack, axis=0)
|
aggregate_gradients(client_gradients)
staticmethod
Aggregates gradient bitstreams from multiple clients.
Parameters:
| Name |
Type |
Description |
Default |
client_gradients
|
list[ndarray[Any, Any]]
|
List of numpy arrays (bitstreams).
All must have same shape.
|
required
|
Returns:
| Type |
Description |
ndarray[Any, Any]
|
Aggregated bitstream (Majority Vote).
|
Source code in src/sc_neurocore/learning/federated.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 | @staticmethod
def aggregate_gradients(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
"""
Aggregates gradient bitstreams from multiple clients.
Args:
client_gradients: List of numpy arrays (bitstreams).
All must have same shape.
Returns:
Aggregated bitstream (Majority Vote).
"""
if not client_gradients:
raise ValueError("No gradients to aggregate")
# Stack: (Num_Clients, Gradient_Size)
stack = np.stack(client_gradients, axis=0)
# Sum bits at each position across clients
# (Client1_bit_i + Client2_bit_i + ... )
sums = np.sum(stack, axis=0)
# Majority Vote
# If sum > num_clients / 2, output 1
threshold = len(client_gradients) / 2.0
aggregated = (sums > threshold).astype(np.uint8)
return aggregated
|
secure_sum_protocol(client_gradients)
staticmethod
Simulates a secure aggregation where the server only sees the sum,
not individual updates (like Secure Multi-Party Computation).
Source code in src/sc_neurocore/learning/federated.py
49
50
51
52
53
54
55
56
57
58 | @staticmethod
def secure_sum_protocol(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
"""
Simulates a secure aggregation where the server only sees the sum,
not individual updates (like Secure Multi-Party Computation).
"""
# In SC, 'Summing' bitstreams usually produces an integer result (0..N).
# This is strictly not a bitstream anymore but a discretized value.
stack = np.stack(client_gradients, axis=0)
return np.sum(stack, axis=0)
|
Lifelong (EWC)
Elastic Weight Consolidation with active penalty: pushes drifted weights
back toward consolidated values, weighted by Fisher information.
sc_neurocore.learning.lifelong
EWC_SCLayer
dataclass
Bases: SCLearningLayer
Lifelong Learning Layer using Elastic Weight Consolidation (Approx).
Source code in src/sc_neurocore/learning/lifelong.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 | @dataclass
class EWC_SCLayer(SCLearningLayer):
"""
Lifelong Learning Layer using Elastic Weight Consolidation (Approx).
"""
ewc_lambda: float = 10.0 # Strength of constraint
def __post_init__(self) -> None:
super().__post_init__()
self.fisher_info = np.zeros((self.n_neurons, self.n_inputs))
self.star_weights = np.zeros((self.n_neurons, self.n_inputs))
def consolidate_task(self) -> None:
"""
Call after finishing a task.
Calculate Fisher Info (Importance) and freeze 'star' weights.
"""
# In SC, Fisher Info approx ~ Activity * Plasticity
# Weights that changed a lot or are high are often important.
# Simplified: Importance = Current Weight Magnitude (Hebbian)
current_w = self.get_weights()
self.star_weights = current_w.copy()
# Assume all non-zero weights are somewhat important
self.fisher_info = current_w.copy()
def apply_ewc_penalty(self, step_size: float = 0.01) -> float:
"""Push weights back toward consolidated values, weighted by Fisher info.
Kirkpatrick et al. 2017, adapted to SC/STDP setting.
Penalty gradient per synapse: F_i * (w_i - w_star_i).
Parameters
----------
step_size : float
Fraction of penalty gradient to apply per call.
Returns
-------
float
Total penalty magnitude (for logging).
"""
current_w = self.get_weights()
delta = current_w - self.star_weights
penalty_grad = self.fisher_info * delta
correction = self.ewc_lambda * step_size * penalty_grad
new_w = np.clip(current_w - correction, self.w_min, self.w_max)
for i in range(self.n_neurons):
for j in range(self.n_inputs):
self.synapses[i][j].w = float(new_w[i, j])
return float(np.sum(np.abs(penalty_grad)))
|
consolidate_task()
Call after finishing a task.
Calculate Fisher Info (Importance) and freeze 'star' weights.
Source code in src/sc_neurocore/learning/lifelong.py
26
27
28
29
30
31
32
33
34
35
36
37
38 | def consolidate_task(self) -> None:
"""
Call after finishing a task.
Calculate Fisher Info (Importance) and freeze 'star' weights.
"""
# In SC, Fisher Info approx ~ Activity * Plasticity
# Weights that changed a lot or are high are often important.
# Simplified: Importance = Current Weight Magnitude (Hebbian)
current_w = self.get_weights()
self.star_weights = current_w.copy()
# Assume all non-zero weights are somewhat important
self.fisher_info = current_w.copy()
|
apply_ewc_penalty(step_size=0.01)
Push weights back toward consolidated values, weighted by Fisher info.
Kirkpatrick et al. 2017, adapted to SC/STDP setting.
Penalty gradient per synapse: F_i * (w_i - w_star_i).
Parameters
step_size : float
Fraction of penalty gradient to apply per call.
Returns
float
Total penalty magnitude (for logging).
Source code in src/sc_neurocore/learning/lifelong.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 | def apply_ewc_penalty(self, step_size: float = 0.01) -> float:
"""Push weights back toward consolidated values, weighted by Fisher info.
Kirkpatrick et al. 2017, adapted to SC/STDP setting.
Penalty gradient per synapse: F_i * (w_i - w_star_i).
Parameters
----------
step_size : float
Fraction of penalty gradient to apply per call.
Returns
-------
float
Total penalty magnitude (for logging).
"""
current_w = self.get_weights()
delta = current_w - self.star_weights
penalty_grad = self.fisher_info * delta
correction = self.ewc_lambda * step_size * penalty_grad
new_w = np.clip(current_w - correction, self.w_min, self.w_max)
for i in range(self.n_neurons):
for j in range(self.n_inputs):
self.synapses[i][j].w = float(new_w[i, j])
return float(np.sum(np.abs(penalty_grad)))
|
Neuroevolution
sc_neurocore.learning.neuroevolution
SNNGeneticEvolver
dataclass
Genetic Algorithm for evolving SNN weights/parameters.
Source code in src/sc_neurocore/learning/neuroevolution.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 | @dataclass
class SNNGeneticEvolver:
"""
Genetic Algorithm for evolving SNN weights/parameters.
"""
population_size: int = 20
mutation_rate: float = 0.05
elite_fraction: float = 0.2
def __init__(self, layer_factory: Callable[[], Any], fitness_func: Callable[[Any], float]):
self.layer_factory = layer_factory
self.fitness_func = fitness_func
# Initialize population
self.population = [layer_factory() for _ in range(self.population_size)]
def evolve(self, generations: int) -> None:
for gen in range(generations):
# 1. Evaluate Fitness
scores = [self.fitness_func(ind) for ind in self.population]
# Sort by fitness (descending)
ranked_indices = np.argsort(scores)[::-1]
ranked_pop = [self.population[i] for i in ranked_indices]
logger.info("Gen %d: Best Fitness = %.4f", gen, scores[ranked_indices[0]])
# 2. Selection (Elitism)
n_elite = int(self.population_size * self.elite_fraction)
next_gen = ranked_pop[:n_elite]
# 3. Crossover & Mutation
while len(next_gen) < self.population_size:
# Simple random selection for parents
p1, p2 = np.random.choice(ranked_pop[: n_elite + 5], 2, replace=False)
child = self._crossover(p1, p2)
self._mutate(child)
next_gen.append(child)
self.population = next_gen
return self.population[0] # Return best
def _crossover(self, p1, p2) -> None:
# Create new instance
child = self.layer_factory()
if not hasattr(p1, "weights"):
return child
# Uniform crossover
mask = np.random.rand(*p1.weights.shape) > 0.5
child.weights = np.where(mask, p1.weights, p2.weights)
return child
def _mutate(self, ind) -> None:
if not hasattr(ind, "weights"):
return
# Gaussian mutation
mutation_mask = np.random.rand(*ind.weights.shape) < self.mutation_rate
noise = np.random.normal(0, 0.1, ind.weights.shape)
ind.weights[mutation_mask] += noise[mutation_mask]
ind.weights = np.clip(ind.weights, 0, 1)
|