Learning
Training paradigms beyond single-node STDP: BPTT, truncated BPTT,
eligibility traces, reward-modulated learning, meta-learning,
homeostatic scaling, short-term plasticity, structural plasticity,
federated learning, lifelong/continual learning, neuroevolution,
learning-rate schedulers, and training callbacks.
BPTT Learner
sc_neurocore.learning.advanced.BPTTLearner
Backpropagation Through Time for spiking networks.
Uses fast-sigmoid surrogate gradient (Neftci et al. 2019) to handle
the spike non-differentiability.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 | class BPTTLearner:
"""Backpropagation Through Time for spiking networks.
Uses fast-sigmoid surrogate gradient (Neftci et al. 2019) to handle
the spike non-differentiability.
"""
def __init__(self, network: Any, loss_fn: Callable[..., float], lr: float = 1e-3) -> None:
self.network = network
self.loss_fn = loss_fn
self.lr = lr
def train_step(self, inputs: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
"""One BPTT step: forward pass, loss, backward with surrogate gradients.
Parameters
----------
inputs : np.ndarray[Any, Any]
Shape (n_steps, n_input) input currents.
targets : np.ndarray[Any, Any]
Shape (n_steps, n_output) target spike trains.
Returns
-------
float
Scalar loss value.
"""
n_steps = inputs.shape[0]
for pop in self.network.populations:
pop.reset_all()
recorded_v = []
recorded_spikes = []
for t in range(n_steps):
currents = inputs[t]
pop = self.network.populations[0]
spikes = pop.step_all(currents[: pop.n])
recorded_v.append(pop.voltages.copy())
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
loss = float(self.loss_fn(spike_arr, targets))
output_error = spike_arr - targets
for proj in self.network.projections:
n_src = proj.source.n
grad_w = np.zeros_like(proj.data)
for t in range(n_steps):
surr = _fast_sigmoid_surrogate(recorded_v[t])
post_delta = output_error[t][: proj.target.n] * surr[: proj.target.n]
for i in range(n_src):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
grad_w[k] += recorded_spikes[t][i] * post_delta[j]
proj.data -= self.lr * grad_w / max(n_steps, 1)
return loss
|
train_step(inputs, targets)
One BPTT step: forward pass, loss, backward with surrogate gradients.
Parameters
inputs : np.ndarray[Any, Any]
Shape (n_steps, n_input) input currents.
targets : np.ndarray[Any, Any]
Shape (n_steps, n_output) target spike trains.
Returns
float
Scalar loss value.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 | def train_step(self, inputs: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
"""One BPTT step: forward pass, loss, backward with surrogate gradients.
Parameters
----------
inputs : np.ndarray[Any, Any]
Shape (n_steps, n_input) input currents.
targets : np.ndarray[Any, Any]
Shape (n_steps, n_output) target spike trains.
Returns
-------
float
Scalar loss value.
"""
n_steps = inputs.shape[0]
for pop in self.network.populations:
pop.reset_all()
recorded_v = []
recorded_spikes = []
for t in range(n_steps):
currents = inputs[t]
pop = self.network.populations[0]
spikes = pop.step_all(currents[: pop.n])
recorded_v.append(pop.voltages.copy())
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
loss = float(self.loss_fn(spike_arr, targets))
output_error = spike_arr - targets
for proj in self.network.projections:
n_src = proj.source.n
grad_w = np.zeros_like(proj.data)
for t in range(n_steps):
surr = _fast_sigmoid_surrogate(recorded_v[t])
post_delta = output_error[t][: proj.target.n] * surr[: proj.target.n]
for i in range(n_src):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
grad_w[k] += recorded_spikes[t][i] * post_delta[j]
proj.data -= self.lr * grad_w / max(n_steps, 1)
return loss
|
Truncated BPTT (Williams & Peng 1990)
Chunks long sequences into windows of k timesteps, backpropagating
gradients within each chunk while carrying membrane state forward.
Memory O(k) instead of O(T).
sc_neurocore.learning.advanced.TBPTTLearner
Truncated Backpropagation Through Time for long sequences.
Splits input into chunks of k timesteps, backpropagating gradients
only within each chunk while carrying forward state (membrane voltage)
across boundaries. Reduces memory from O(T) to O(k).
Williams & Peng 1990.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 | class TBPTTLearner:
"""Truncated Backpropagation Through Time for long sequences.
Splits input into chunks of ``k`` timesteps, backpropagating gradients
only within each chunk while carrying forward state (membrane voltage)
across boundaries. Reduces memory from O(T) to O(k).
Williams & Peng 1990.
"""
def __init__(
self, network: Any, loss_fn: Callable[..., float], lr: float = 1e-3, k: int = 50
) -> None:
self.network = network
self.loss_fn = loss_fn
self.lr = lr
self.k = k
def train_step(self, inputs: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
"""One TBPTT step over the full sequence, chunked into windows of k.
Parameters
----------
inputs : np.ndarray[Any, Any]
Shape (n_steps, n_input).
targets : np.ndarray[Any, Any]
Shape (n_steps, n_output).
Returns
-------
float
Total loss summed across chunks.
"""
n_steps = inputs.shape[0]
total_loss = 0.0
for pop in self.network.populations:
pop.reset_all()
for chunk_start in range(0, n_steps, self.k):
chunk_end = min(chunk_start + self.k, n_steps)
chunk_len = chunk_end - chunk_start
recorded_v = []
recorded_spikes = []
for t in range(chunk_start, chunk_end):
pop = self.network.populations[0]
spikes = pop.step_all(inputs[t][: pop.n])
recorded_v.append(pop.voltages.copy())
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
chunk_targets = targets[chunk_start:chunk_end]
chunk_loss = float(self.loss_fn(spike_arr, chunk_targets))
total_loss += chunk_loss
# Backward within this chunk only
output_error = spike_arr - chunk_targets
for proj in self.network.projections:
n_src = proj.source.n
grad_w = np.zeros_like(proj.data)
for t_local in range(chunk_len):
surr = _fast_sigmoid_surrogate(recorded_v[t_local])
post_delta = output_error[t_local][: proj.target.n] * surr[: proj.target.n]
for i in range(n_src):
for k_idx in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k_idx]
grad_w[k_idx] += recorded_spikes[t_local][i] * post_delta[j]
proj.data -= self.lr * grad_w / max(chunk_len, 1)
# State (voltages) carries forward — no reset between chunks
return total_loss
|
train_step(inputs, targets)
One TBPTT step over the full sequence, chunked into windows of k.
Parameters
inputs : np.ndarray[Any, Any]
Shape (n_steps, n_input).
targets : np.ndarray[Any, Any]
Shape (n_steps, n_output).
Returns
float
Total loss summed across chunks.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 | def train_step(self, inputs: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
"""One TBPTT step over the full sequence, chunked into windows of k.
Parameters
----------
inputs : np.ndarray[Any, Any]
Shape (n_steps, n_input).
targets : np.ndarray[Any, Any]
Shape (n_steps, n_output).
Returns
-------
float
Total loss summed across chunks.
"""
n_steps = inputs.shape[0]
total_loss = 0.0
for pop in self.network.populations:
pop.reset_all()
for chunk_start in range(0, n_steps, self.k):
chunk_end = min(chunk_start + self.k, n_steps)
chunk_len = chunk_end - chunk_start
recorded_v = []
recorded_spikes = []
for t in range(chunk_start, chunk_end):
pop = self.network.populations[0]
spikes = pop.step_all(inputs[t][: pop.n])
recorded_v.append(pop.voltages.copy())
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
chunk_targets = targets[chunk_start:chunk_end]
chunk_loss = float(self.loss_fn(spike_arr, chunk_targets))
total_loss += chunk_loss
# Backward within this chunk only
output_error = spike_arr - chunk_targets
for proj in self.network.projections:
n_src = proj.source.n
grad_w = np.zeros_like(proj.data)
for t_local in range(chunk_len):
surr = _fast_sigmoid_surrogate(recorded_v[t_local])
post_delta = output_error[t_local][: proj.target.n] * surr[: proj.target.n]
for i in range(n_src):
for k_idx in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k_idx]
grad_w[k_idx] += recorded_spikes[t_local][i] * post_delta[j]
proj.data -= self.lr * grad_w / max(chunk_len, 1)
# State (voltages) carries forward — no reset between chunks
return total_loss
|
Eligibility Traces (e-prop, Bellec et al. 2020)
sc_neurocore.learning.advanced.EligibilityTrace
E-prop eligibility trace: three-factor learning (pre x post x error).
Bellec et al. 2020.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 | class EligibilityTrace:
"""E-prop eligibility trace: three-factor learning (pre x post x error).
Bellec et al. 2020.
"""
def __init__(self, tau_e: float = 20.0, dt: float = 1.0) -> None:
self.decay = float(np.exp(-dt / tau_e))
self._trace: np.ndarray[Any, Any] | None = None
def update(
self,
pre_spike: np.ndarray[Any, Any],
post_spike: np.ndarray[Any, Any],
error_signal: np.ndarray[Any, Any],
) -> np.ndarray[Any, Any]:
"""Compute weight delta from three-factor rule.
Parameters
----------
pre_spike, post_spike : np.ndarray[Any, Any]
Binary (0/1) vectors of length n_pre, n_post.
error_signal : np.ndarray[Any, Any]
Error signal of length n_post.
Returns
-------
np.ndarray[Any, Any]
Weight delta matrix of shape (n_pre, n_post).
"""
outer = np.outer(pre_spike, post_spike)
if self._trace is None:
self._trace = np.zeros_like(outer)
self._trace = self.decay * self._trace + outer
delta: np.ndarray[Any, Any] = self._trace * error_signal[np.newaxis, :]
return delta
|
update(pre_spike, post_spike, error_signal)
Compute weight delta from three-factor rule.
Parameters
pre_spike, post_spike : np.ndarray[Any, Any]
Binary (0/1) vectors of length n_pre, n_post.
error_signal : np.ndarray[Any, Any]
Error signal of length n_post.
Returns
np.ndarray[Any, Any]
Weight delta matrix of shape (n_pre, n_post).
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 | def update(
self,
pre_spike: np.ndarray[Any, Any],
post_spike: np.ndarray[Any, Any],
error_signal: np.ndarray[Any, Any],
) -> np.ndarray[Any, Any]:
"""Compute weight delta from three-factor rule.
Parameters
----------
pre_spike, post_spike : np.ndarray[Any, Any]
Binary (0/1) vectors of length n_pre, n_post.
error_signal : np.ndarray[Any, Any]
Error signal of length n_post.
Returns
-------
np.ndarray[Any, Any]
Weight delta matrix of shape (n_pre, n_post).
"""
outer = np.outer(pre_spike, post_spike)
if self._trace is None:
self._trace = np.zeros_like(outer)
self._trace = self.decay * self._trace + outer
delta: np.ndarray[Any, Any] = self._trace * error_signal[np.newaxis, :]
return delta
|
Reward-Modulated STDP
sc_neurocore.learning.advanced.RewardModulatedLearner
Reward-modulated STDP (R-STDP).
Maintains per-synapse eligibility traces and applies weight updates
scaled by a global reward signal.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257 | class RewardModulatedLearner:
"""Reward-modulated STDP (R-STDP).
Maintains per-synapse eligibility traces and applies weight updates
scaled by a global reward signal.
"""
def __init__(self, network: Any, tau_reward: float = 100.0) -> None:
self.network = network
self.reward_decay = np.exp(-1.0 / tau_reward)
self._elig: dict[int, np.ndarray[Any, Any]] = {}
self._pre_trace: dict[int, np.ndarray[Any, Any]] = {}
self._post_trace: dict[int, np.ndarray[Any, Any]] = {}
self._init_traces()
def _init_traces(self) -> None:
for proj in self.network.projections:
pid = id(proj)
self._elig[pid] = np.zeros_like(proj.data)
self._pre_trace[pid] = np.zeros(proj.source.n)
self._post_trace[pid] = np.zeros(proj.target.n)
def step(self, reward: float) -> None:
"""Apply reward-modulated weight update.
Parameters
----------
reward : float
Scalar reward signal.
"""
tau_trace = 20.0
trace_decay = np.exp(-1.0 / tau_trace)
for proj in self.network.projections:
pid = id(proj)
pre_sp = proj.source.voltages > 0.9
post_sp = proj.target.voltages > 0.9
self._pre_trace[pid] = trace_decay * self._pre_trace[pid] + pre_sp
self._post_trace[pid] = trace_decay * self._post_trace[pid] + post_sp
for i in range(proj.source.n):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
self._elig[pid][k] = (
self.reward_decay * self._elig[pid][k]
+ self._pre_trace[pid][i] * self._post_trace[pid][j]
)
proj.data += 0.01 * reward * self._elig[pid]
np.clip(proj.data, 0.0, None, out=proj.data)
|
step(reward)
Apply reward-modulated weight update.
Parameters
reward : float
Scalar reward signal.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257 | def step(self, reward: float) -> None:
"""Apply reward-modulated weight update.
Parameters
----------
reward : float
Scalar reward signal.
"""
tau_trace = 20.0
trace_decay = np.exp(-1.0 / tau_trace)
for proj in self.network.projections:
pid = id(proj)
pre_sp = proj.source.voltages > 0.9
post_sp = proj.target.voltages > 0.9
self._pre_trace[pid] = trace_decay * self._pre_trace[pid] + pre_sp
self._post_trace[pid] = trace_decay * self._post_trace[pid] + post_sp
for i in range(proj.source.n):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
self._elig[pid][k] = (
self.reward_decay * self._elig[pid][k]
+ self._pre_trace[pid][i] * self._post_trace[pid][j]
)
proj.data += 0.01 * reward * self._elig[pid]
np.clip(proj.data, 0.0, None, out=proj.data)
|
Learning-Rate Schedulers
Import these schedulers from either sc_neurocore.learning or
sc_neurocore.learning.schedulers. They keep only deterministic local state and
return the current learning rate from each step() call.
sc_neurocore.learning.schedulers.StepScheduler
Drop learning rate by gamma every step_size steps.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 | class StepScheduler:
"""Drop learning rate by *gamma* every *step_size* steps."""
def __init__(self, lr_init: float, step_size: int, gamma: float = 0.1):
self.lr = lr_init
self.step_size = step_size
self.gamma = gamma
self._count = 0
def step(self) -> float:
"""Advance one scheduler step and return the current learning rate."""
self._count += 1
if self._count % self.step_size == 0:
self.lr *= self.gamma
return self.lr
def reset(self) -> None:
"""Reset the internal step counter without changing the current rate."""
self._count = 0
|
step()
Advance one scheduler step and return the current learning rate.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
| def step(self) -> float:
"""Advance one scheduler step and return the current learning rate."""
self._count += 1
if self._count % self.step_size == 0:
self.lr *= self.gamma
return self.lr
|
reset()
Reset the internal step counter without changing the current rate.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
| def reset(self) -> None:
"""Reset the internal step counter without changing the current rate."""
self._count = 0
|
sc_neurocore.learning.schedulers.ExponentialScheduler
Multiply learning rate by gamma each step.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | class ExponentialScheduler:
"""Multiply learning rate by *gamma* each step."""
def __init__(self, lr_init: float, gamma: float = 0.999):
self.lr = lr_init
self.gamma = gamma
def step(self) -> float:
"""Apply one exponential decay update and return the new rate."""
self.lr *= self.gamma
return self.lr
def reset(self) -> None:
"""Leave the stateless exponential schedule unchanged."""
pass
|
step()
Apply one exponential decay update and return the new rate.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
| def step(self) -> float:
"""Apply one exponential decay update and return the new rate."""
self.lr *= self.gamma
return self.lr
|
reset()
Leave the stateless exponential schedule unchanged.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
| def reset(self) -> None:
"""Leave the stateless exponential schedule unchanged."""
pass
|
sc_neurocore.learning.schedulers.CosineScheduler
Cosine annealing from lr_init to lr_min over total_steps.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 | class CosineScheduler:
"""Cosine annealing from *lr_init* to *lr_min* over *total_steps*."""
def __init__(self, lr_init: float, lr_min: float, total_steps: int):
self.lr_init = lr_init
self.lr_min = lr_min
self.total_steps = total_steps
self._count = 0
self.lr = lr_init
def step(self) -> float:
"""Advance one cosine-annealing step and return the new rate."""
self._count += 1
t = min(self._count / self.total_steps, 1.0)
self.lr = self.lr_min + 0.5 * (self.lr_init - self.lr_min) * (1 + math.cos(math.pi * t))
return self.lr
def reset(self) -> None:
"""Restore the initial learning rate and restart the cosine schedule."""
self._count = 0
self.lr = self.lr_init
|
step()
Advance one cosine-annealing step and return the new rate.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
| def step(self) -> float:
"""Advance one cosine-annealing step and return the new rate."""
self._count += 1
t = min(self._count / self.total_steps, 1.0)
self.lr = self.lr_min + 0.5 * (self.lr_init - self.lr_min) * (1 + math.cos(math.pi * t))
return self.lr
|
reset()
Restore the initial learning rate and restart the cosine schedule.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
| def reset(self) -> None:
"""Restore the initial learning rate and restart the cosine schedule."""
self._count = 0
self.lr = self.lr_init
|
sc_neurocore.learning.schedulers.WarmupCosineScheduler
Linear warmup followed by cosine decay.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 | class WarmupCosineScheduler:
"""Linear warmup followed by cosine decay."""
def __init__(
self,
lr_init: float,
lr_min: float,
warmup_steps: int,
total_steps: int,
):
self.lr_init = lr_init
self.lr_min = lr_min
self.warmup_steps = warmup_steps
self.total_steps = total_steps
self._count = 0
self.lr = 0.0
def step(self) -> float:
"""Advance through warmup or cosine decay and return the current rate."""
self._count += 1
if self._count <= self.warmup_steps:
self.lr = self.lr_init * (self._count / self.warmup_steps)
else:
decay_steps = self.total_steps - self.warmup_steps
t = min((self._count - self.warmup_steps) / decay_steps, 1.0)
self.lr = self.lr_min + 0.5 * (self.lr_init - self.lr_min) * (1 + math.cos(math.pi * t))
return self.lr
def reset(self) -> None:
"""Return to the pre-warmup state with zero current learning rate."""
self._count = 0
self.lr = 0.0
|
step()
Advance through warmup or cosine decay and return the current rate.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
104
105
106
107
108
109
110
111
112
113 | def step(self) -> float:
"""Advance through warmup or cosine decay and return the current rate."""
self._count += 1
if self._count <= self.warmup_steps:
self.lr = self.lr_init * (self._count / self.warmup_steps)
else:
decay_steps = self.total_steps - self.warmup_steps
t = min((self._count - self.warmup_steps) / decay_steps, 1.0)
self.lr = self.lr_min + 0.5 * (self.lr_init - self.lr_min) * (1 + math.cos(math.pi * t))
return self.lr
|
reset()
Return to the pre-warmup state with zero current learning rate.
Source code in src/sc_neurocore/learning/schedulers.py
| Python |
|---|
| def reset(self) -> None:
"""Return to the pre-warmup state with zero current learning rate."""
self._count = 0
self.lr = 0.0
|
Training Callbacks
Import callbacks from either sc_neurocore.learning or
sc_neurocore.learning.callbacks. CSVCallback has no optional dependencies;
TensorBoardCallback and WandBCallback fail closed with SCDependencyError
when their optional runtime packages are not installed.
sc_neurocore.learning.callbacks.TrainingCallback
Base class for training callbacks.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
| class TrainingCallback:
"""Base class for training callbacks."""
def log(self, metrics: dict[str, float], step: int) -> None:
"""Record a mapping of metric names to values at the given step."""
def close(self) -> None:
"""Flush and release any resources held by the callback."""
|
log(metrics, step)
Record a mapping of metric names to values at the given step.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
| def log(self, metrics: dict[str, float], step: int) -> None:
"""Record a mapping of metric names to values at the given step."""
|
close()
Flush and release any resources held by the callback.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
| def close(self) -> None:
"""Flush and release any resources held by the callback."""
|
sc_neurocore.learning.callbacks.CSVCallback
Bases: TrainingCallback
Log metrics to a CSV file (no dependencies).
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 | class CSVCallback(TrainingCallback):
"""Log metrics to a CSV file (no dependencies)."""
def __init__(self, path: str = "metrics.csv"):
self._path = path
self._rows: list[dict[str, float | int]] = []
def log(self, metrics: dict[str, float], step: int) -> None:
"""Buffer one row of metrics for the given step in memory."""
self._rows.append({"step": step, **metrics})
def close(self) -> None:
"""Write all buffered metric rows to the CSV file."""
if not self._rows:
return
keys = list(self._rows[0].keys())
with open(self._path, "w", newline="") as f:
f.write(",".join(keys) + "\n")
for row in self._rows:
f.write(",".join(str(row[k]) for k in keys) + "\n")
|
log(metrics, step)
Buffer one row of metrics for the given step in memory.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
| def log(self, metrics: dict[str, float], step: int) -> None:
"""Buffer one row of metrics for the given step in memory."""
self._rows.append({"step": step, **metrics})
|
close()
Write all buffered metric rows to the CSV file.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
96
97
98
99
100
101
102
103
104 | def close(self) -> None:
"""Write all buffered metric rows to the CSV file."""
if not self._rows:
return
keys = list(self._rows[0].keys())
with open(self._path, "w", newline="") as f:
f.write(",".join(keys) + "\n")
for row in self._rows:
f.write(",".join(str(row[k]) for k in keys) + "\n")
|
sc_neurocore.learning.callbacks.TensorBoardCallback
Bases: TrainingCallback
Log scalars to TensorBoard via torch.utils.tensorboard.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 | class TensorBoardCallback(TrainingCallback):
"""Log scalars to TensorBoard via ``torch.utils.tensorboard``."""
def __init__(self, log_dir: str = "runs"):
try:
from torch.utils.tensorboard import SummaryWriter
except ImportError:
from sc_neurocore.exceptions import SCDependencyError
raise SCDependencyError("TensorBoard requires torch: pip install sc-neurocore[gpu]")
# ``SummaryWriter`` ships without type stubs; binding it through an ``Any``
# handle keeps construction and method calls strict-clean regardless of
# whether torch stubs are installed in the environment.
writer_factory: Any = SummaryWriter
self._writer: Any = writer_factory(log_dir=log_dir)
def log(self, metrics: dict[str, float], step: int) -> None:
"""Write each metric as a TensorBoard scalar at the given step."""
for key, value in metrics.items():
self._writer.add_scalar(key, value, step)
def close(self) -> None:
"""Close the underlying TensorBoard summary writer."""
self._writer.close()
|
log(metrics, step)
Write each metric as a TensorBoard scalar at the given step.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
| def log(self, metrics: dict[str, float], step: int) -> None:
"""Write each metric as a TensorBoard scalar at the given step."""
for key, value in metrics.items():
self._writer.add_scalar(key, value, step)
|
close()
Close the underlying TensorBoard summary writer.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
| def close(self) -> None:
"""Close the underlying TensorBoard summary writer."""
self._writer.close()
|
sc_neurocore.learning.callbacks.WandBCallback
Bases: TrainingCallback
Log metrics to Weights & Biases.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 | class WandBCallback(TrainingCallback):
"""Log metrics to Weights & Biases."""
def __init__(self, project: str = "sc-neurocore", **init_kwargs: Any):
try:
import wandb
self._wandb = wandb
except ImportError:
from sc_neurocore.exceptions import SCDependencyError
raise SCDependencyError("W&B requires wandb: pip install wandb")
self._wandb.init(project=project, **init_kwargs)
def log(self, metrics: dict[str, float], step: int) -> None:
"""Forward the metrics to the active Weights & Biases run."""
self._wandb.log(metrics, step=step)
def close(self) -> None:
"""Finish the active Weights & Biases run."""
self._wandb.finish()
|
log(metrics, step)
Forward the metrics to the active Weights & Biases run.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
| def log(self, metrics: dict[str, float], step: int) -> None:
"""Forward the metrics to the active Weights & Biases run."""
self._wandb.log(metrics, step=step)
|
close()
Finish the active Weights & Biases run.
Source code in src/sc_neurocore/learning/callbacks.py
| Python |
|---|
| def close(self) -> None:
"""Finish the active Weights & Biases run."""
self._wandb.finish()
|
Bounded O(1) Online Learning
sc_neurocore.learning.online_o1 defines the hardware-facing local-learning
contract for streamed online updates. One synapse stores only four bounded
state fields: current weight, pre trace, post trace, and eligibility trace. The
memory proof emitted by build_online_o1_memory_proof(...) is independent of
sequence length and reports the exact per-synapse bit count used by the HDL
emitter.
The first supported rule family is reward-modulated STDP with fixed-point
saturation. OnlineO1Config.to_scnir_annotation(...) emits deterministic
SC-NIR metadata for online-learning-capable synapses, and
sc_neurocore.hdl_gen.OnlineO1LearningEmitter emits a one-lane Verilog update
block with the same bounded state fields and saturation policy. This software
and RTL contract is local handoff evidence; it does not claim board synthesis
or physical FPGA learning evidence until those runs are attached separately.
When the optional native library is present, RustOnlineO1Synapse provides the
same bounded fixed-point step contract through the Rust C-FFI bridge.
OnlineO1LearningEmitter.estimate_resources(...) reports deterministic
pre-synthesis BRAM/LUT/DSP planning estimates for a chosen synapse count. The
reproducible adaptation benchmark command is:
BashPYTHONPATH=src python tools/online_o1_adaptation_benchmark.py \
--output benchmarks/results/online_o1_adaptation_benchmark.json
The default local report is deterministic simulation evidence: it measures
pre/post reward-pairing adaptation speed, records Python/Rust parity when the
native library is present, and marks hardware_measurement_claimed=false.
sc_neurocore.learning.online_o1.OnlineO1Config
dataclass
Hardware-bounded configuration for local reward-modulated STDP.
Source code in src/sc_neurocore/learning/online_o1.py
| Python |
|---|
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 | @dataclass(frozen=True, slots=True)
class OnlineO1Config:
"""Hardware-bounded configuration for local reward-modulated STDP."""
weight_bits: int = 16
trace_bits: int = 12
reward_bits: int = 8
learning_shift: int = 4
trace_decay_shift: int = 4
rule_family: Literal["reward_modulated_stdp"] = "reward_modulated_stdp"
def __post_init__(self) -> None:
if self.weight_bits < 1:
raise ValueError("weight_bits must be >= 1")
if self.trace_bits < 2:
raise ValueError("trace_bits must be >= 2")
if self.reward_bits < 1:
raise ValueError("reward_bits must be >= 1")
if self.learning_shift < 0:
raise ValueError("learning_shift must be >= 0")
if self.trace_decay_shift < 0:
raise ValueError("trace_decay_shift must be >= 0")
@property
def max_weight(self) -> int:
"""Maximum unsigned fixed-point weight."""
return (1 << self.weight_bits) - 1
@property
def max_trace(self) -> int:
"""Maximum unsigned trace value."""
return (1 << self.trace_bits) - 1
@property
def min_eligibility(self) -> int:
"""Minimum signed eligibility value."""
return -(1 << (self.trace_bits - 1))
@property
def max_eligibility(self) -> int:
"""Maximum signed eligibility value."""
return (1 << (self.trace_bits - 1)) - 1
@property
def min_reward(self) -> int:
"""Minimum signed reward input."""
return -(1 << (self.reward_bits - 1))
@property
def max_reward(self) -> int:
"""Maximum signed reward input."""
return (1 << (self.reward_bits - 1)) - 1
@property
def per_synapse_state_bits(self) -> int:
"""Stored bits per synapse: weight plus three bounded traces."""
return self.weight_bits + 3 * self.trace_bits
def to_scnir_annotation(self, *, rule_id: str) -> dict[str, Any]:
"""Return deterministic SC-NIR metadata for online-learning synapses."""
if not rule_id:
raise ValueError("rule_id must be non-empty")
return {
"schema_version": ONLINE_O1_ANNOTATION_SCHEMA_VERSION,
"rule_id": rule_id,
"rule_family": self.rule_family,
"state_fields": list(_STATE_FIELDS),
"per_synapse_state_bits": self.per_synapse_state_bits,
"weight_bits": self.weight_bits,
"trace_bits": self.trace_bits,
"reward_bits": self.reward_bits,
"learning_shift": self.learning_shift,
"trace_decay_shift": self.trace_decay_shift,
"saturation_policy": "signed_eligibility_unsigned_weight",
"hidden_history_fields": [],
"sequence_length_independent": True,
}
|
max_weight
property
Maximum unsigned fixed-point weight.
max_trace
property
Maximum unsigned trace value.
min_eligibility
property
Minimum signed eligibility value.
max_eligibility
property
Maximum signed eligibility value.
min_reward
property
Minimum signed reward input.
max_reward
property
Maximum signed reward input.
per_synapse_state_bits
property
Stored bits per synapse: weight plus three bounded traces.
to_scnir_annotation(*, rule_id)
Return deterministic SC-NIR metadata for online-learning synapses.
Source code in src/sc_neurocore/learning/online_o1.py
| Python |
|---|
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 | def to_scnir_annotation(self, *, rule_id: str) -> dict[str, Any]:
"""Return deterministic SC-NIR metadata for online-learning synapses."""
if not rule_id:
raise ValueError("rule_id must be non-empty")
return {
"schema_version": ONLINE_O1_ANNOTATION_SCHEMA_VERSION,
"rule_id": rule_id,
"rule_family": self.rule_family,
"state_fields": list(_STATE_FIELDS),
"per_synapse_state_bits": self.per_synapse_state_bits,
"weight_bits": self.weight_bits,
"trace_bits": self.trace_bits,
"reward_bits": self.reward_bits,
"learning_shift": self.learning_shift,
"trace_decay_shift": self.trace_decay_shift,
"saturation_policy": "signed_eligibility_unsigned_weight",
"hidden_history_fields": [],
"sequence_length_independent": True,
}
|
sc_neurocore.learning.online_o1.OnlineO1Synapse
dataclass
One fixed-point reward-modulated STDP synapse with O(1) state.
Source code in src/sc_neurocore/learning/online_o1.py
| Python |
|---|
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204 | @dataclass(slots=True)
class OnlineO1Synapse:
"""One fixed-point reward-modulated STDP synapse with O(1) state."""
config: OnlineO1Config
initial_weight: int = 0
weight: int = 0
pre_trace: int = 0
post_trace: int = 0
eligibility: int = 0
def __post_init__(self) -> None:
self.weight = _saturate(self.initial_weight, 0, self.config.max_weight)
self.pre_trace = 0
self.post_trace = 0
self.eligibility = 0
@property
def state_fields(self) -> tuple[str, ...]:
"""Names of state fields retained between timesteps."""
return _STATE_FIELDS
@property
def state_bit_count(self) -> int:
"""Stored state bits for one synapse."""
return self.config.per_synapse_state_bits
def snapshot(self) -> OnlineO1Snapshot:
"""Return the current bounded state."""
return OnlineO1Snapshot(
weight=self.weight,
pre_trace=self.pre_trace,
post_trace=self.post_trace,
eligibility=self.eligibility,
)
def step(self, *, pre_spike: bool, post_spike: bool, reward: int) -> OnlineO1Snapshot:
"""Advance one streamed timestep and return the bounded state.
The rule uses pre-before-post eligibility:
``eligibility += post_spike * pre_trace - pre_spike * post_trace``
The reward-gated weight update is an arithmetic right shift of the
product, then saturated into the unsigned weight range.
"""
reward = _saturate(reward, self.config.min_reward, self.config.max_reward)
previous_pre_trace = self.pre_trace
previous_post_trace = self.post_trace
self.pre_trace = _decay_unsigned(
self.pre_trace, self.config.trace_decay_shift, self.config.max_trace
)
self.post_trace = _decay_unsigned(
self.post_trace, self.config.trace_decay_shift, self.config.max_trace
)
if pre_spike:
self.pre_trace = self.config.max_trace
if post_spike:
self.post_trace = self.config.max_trace
decayed_eligibility = _decay_signed(self.eligibility, self.config.trace_decay_shift)
potentiation = 0
if post_spike:
potentiation = self.config.max_trace if pre_spike else previous_pre_trace
depression = previous_post_trace if pre_spike else 0
eligibility_delta = potentiation - depression
self.eligibility = _saturate(
decayed_eligibility + eligibility_delta,
self.config.min_eligibility,
self.config.max_eligibility,
)
weight_delta = (reward * self.eligibility) >> self.config.learning_shift
self.weight = _saturate(self.weight + weight_delta, 0, self.config.max_weight)
return self.snapshot()
|
state_fields
property
Names of state fields retained between timesteps.
state_bit_count
property
Stored state bits for one synapse.
snapshot()
Return the current bounded state.
Source code in src/sc_neurocore/learning/online_o1.py
| Python |
|---|
154
155
156
157
158
159
160
161
162 | def snapshot(self) -> OnlineO1Snapshot:
"""Return the current bounded state."""
return OnlineO1Snapshot(
weight=self.weight,
pre_trace=self.pre_trace,
post_trace=self.post_trace,
eligibility=self.eligibility,
)
|
step(*, pre_spike, post_spike, reward)
Advance one streamed timestep and return the bounded state.
The rule uses pre-before-post eligibility:
eligibility += post_spike * pre_trace - pre_spike * post_trace
The reward-gated weight update is an arithmetic right shift of the
product, then saturated into the unsigned weight range.
Source code in src/sc_neurocore/learning/online_o1.py
| Python |
|---|
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204 | def step(self, *, pre_spike: bool, post_spike: bool, reward: int) -> OnlineO1Snapshot:
"""Advance one streamed timestep and return the bounded state.
The rule uses pre-before-post eligibility:
``eligibility += post_spike * pre_trace - pre_spike * post_trace``
The reward-gated weight update is an arithmetic right shift of the
product, then saturated into the unsigned weight range.
"""
reward = _saturate(reward, self.config.min_reward, self.config.max_reward)
previous_pre_trace = self.pre_trace
previous_post_trace = self.post_trace
self.pre_trace = _decay_unsigned(
self.pre_trace, self.config.trace_decay_shift, self.config.max_trace
)
self.post_trace = _decay_unsigned(
self.post_trace, self.config.trace_decay_shift, self.config.max_trace
)
if pre_spike:
self.pre_trace = self.config.max_trace
if post_spike:
self.post_trace = self.config.max_trace
decayed_eligibility = _decay_signed(self.eligibility, self.config.trace_decay_shift)
potentiation = 0
if post_spike:
potentiation = self.config.max_trace if pre_spike else previous_pre_trace
depression = previous_post_trace if pre_spike else 0
eligibility_delta = potentiation - depression
self.eligibility = _saturate(
decayed_eligibility + eligibility_delta,
self.config.min_eligibility,
self.config.max_eligibility,
)
weight_delta = (reward * self.eligibility) >> self.config.learning_shift
self.weight = _saturate(self.weight + weight_delta, 0, self.config.max_weight)
return self.snapshot()
|
sc_neurocore.learning.online_o1.build_online_o1_memory_proof(*, n_synapses, config, sequence_length=None)
Return a sequence-length independent memory proof for the rule.
Source code in src/sc_neurocore/learning/online_o1.py
| Python |
|---|
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225 | def build_online_o1_memory_proof(
*, n_synapses: int, config: OnlineO1Config, sequence_length: int | None = None
) -> dict[str, Any]:
"""Return a sequence-length independent memory proof for the rule."""
if n_synapses < 0:
raise ValueError("n_synapses must be >= 0")
if sequence_length is not None and sequence_length < 0:
raise ValueError("sequence_length must be >= 0")
total_state_bits = n_synapses * config.per_synapse_state_bits
return {
"schema_version": ONLINE_O1_MEMORY_PROOF_SCHEMA_VERSION,
"n_synapses": n_synapses,
"state_fields": list(_STATE_FIELDS),
"per_synapse_state_bits": config.per_synapse_state_bits,
"total_state_bits": total_state_bits,
"sequence_length_independent": True,
"hidden_history_fields": [],
}
|
sc_neurocore._native.learning_bridge.RustOnlineO1Synapse
RAII wrapper for the Rust bounded fixed-point online O(1) learner.
Source code in src/sc_neurocore/_native/learning_bridge.py
| Python |
|---|
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317 | class RustOnlineO1Synapse:
"""RAII wrapper for the Rust bounded fixed-point online O(1) learner."""
__slots__ = ("_ptr",)
def __init__(
self,
*,
weight_bits: int = 16,
trace_bits: int = 12,
reward_bits: int = 8,
learning_shift: int = 4,
trace_decay_shift: int = 4,
initial_weight: int = 0,
) -> None:
if not _HAS_LEARNING:
raise RuntimeError("libautonomous_learning.so not available")
lib = _get_lib()
if not hasattr(lib, "create_online_o1_synapse"):
raise RuntimeError("libautonomous_learning.so lacks online O(1) symbols")
self._ptr = lib.create_online_o1_synapse(
_ct.c_uint8(weight_bits),
_ct.c_uint8(trace_bits),
_ct.c_uint8(reward_bits),
_ct.c_uint8(learning_shift),
_ct.c_uint8(trace_decay_shift),
_ct.c_uint32(initial_weight),
)
if not self._ptr:
raise ValueError("invalid online O(1) fixed-point configuration")
def step(self, *, pre_spike: bool, post_spike: bool, reward: int) -> OnlineO1SnapshotFFI:
"""Advance one timestep and return the bounded fixed-point state."""
snapshot: OnlineO1SnapshotFFI = _get_lib().step_online_o1_synapse(
self._ptr,
pre_spike,
post_spike,
_ct.c_int32(reward),
)
return snapshot
@property
def per_synapse_state_bits(self) -> int:
return int(_get_lib().online_o1_per_synapse_state_bits(self._ptr))
def __del__(self) -> None:
if hasattr(self, "_ptr") and self._ptr and _HAS_LEARNING:
lib = _get_lib()
if hasattr(lib, "destroy_online_o1_synapse"):
lib.destroy_online_o1_synapse(self._ptr)
self._ptr = None
|
step(*, pre_spike, post_spike, reward)
Advance one timestep and return the bounded fixed-point state.
Source code in src/sc_neurocore/_native/learning_bridge.py
| Python |
|---|
297
298
299
300
301
302
303
304
305
306 | def step(self, *, pre_spike: bool, post_spike: bool, reward: int) -> OnlineO1SnapshotFFI:
"""Advance one timestep and return the bounded fixed-point state."""
snapshot: OnlineO1SnapshotFFI = _get_lib().step_online_o1_synapse(
self._ptr,
pre_spike,
post_spike,
_ct.c_int32(reward),
)
return snapshot
|
MAML-style meta-learning for spiking networks.
Finn et al. 2017. Inner loop: fast adaptation on a task.
Outer loop: meta-gradient across tasks.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332 | class MetaLearner:
"""MAML-style meta-learning for spiking networks.
Finn et al. 2017. Inner loop: fast adaptation on a task.
Outer loop: meta-gradient across tasks.
"""
def __init__(self, network: Any, inner_lr: float = 0.01, outer_lr: float = 0.001) -> None:
self.network = network
self.inner_lr = inner_lr
self.outer_lr = outer_lr
def _snapshot_weights(self) -> list[np.ndarray[Any, Any]]:
return [proj.data.copy() for proj in self.network.projections]
def _restore_weights(self, snapshot: list[np.ndarray[Any, Any]]) -> None:
for proj, w in zip(self.network.projections, snapshot):
proj.data[:] = w
def inner_loop(
self, task_data: tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]], n_steps: int = 5
) -> None:
"""Fast adaptation: n_steps of gradient descent on task_data.
Parameters
----------
task_data : tuple
(inputs, targets) arrays.
n_steps : int
Number of inner-loop updates.
"""
inputs, targets = task_data
for _ in range(n_steps):
for pop in self.network.populations:
pop.reset_all()
n_t = inputs.shape[0]
recorded_spikes = []
for t in range(n_t):
pop = self.network.populations[0]
spikes = pop.step_all(inputs[t][: pop.n])
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
error = spike_arr - targets
for proj in self.network.projections:
grad = np.zeros_like(proj.data)
for t in range(n_t):
for i in range(proj.source.n):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
grad[k] += recorded_spikes[t][i] * error[t][j]
proj.data -= self.inner_lr * grad / max(n_t, 1)
def outer_step(self, tasks: list[tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]]]) -> None:
"""Meta-gradient update across multiple tasks.
Parameters
----------
tasks : list of tuple
Each element is (inputs, targets).
"""
meta_grad = [np.zeros_like(proj.data) for proj in self.network.projections]
base_weights = self._snapshot_weights()
for task in tasks:
self._restore_weights(base_weights)
pre_weights = self._snapshot_weights()
self.inner_loop(task)
for idx, proj in enumerate(self.network.projections):
meta_grad[idx] += proj.data - pre_weights[idx]
self._restore_weights(base_weights)
for idx, proj in enumerate(self.network.projections):
proj.data += self.outer_lr * meta_grad[idx] / max(len(tasks), 1)
|
Fast adaptation: n_steps of gradient descent on task_data.
task_data : tuple
(inputs, targets) arrays.
n_steps : int
Number of inner-loop updates.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310 | def inner_loop(
self, task_data: tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]], n_steps: int = 5
) -> None:
"""Fast adaptation: n_steps of gradient descent on task_data.
Parameters
----------
task_data : tuple
(inputs, targets) arrays.
n_steps : int
Number of inner-loop updates.
"""
inputs, targets = task_data
for _ in range(n_steps):
for pop in self.network.populations:
pop.reset_all()
n_t = inputs.shape[0]
recorded_spikes = []
for t in range(n_t):
pop = self.network.populations[0]
spikes = pop.step_all(inputs[t][: pop.n])
recorded_spikes.append(spikes.copy())
spike_arr = np.stack(recorded_spikes)
error = spike_arr - targets
for proj in self.network.projections:
grad = np.zeros_like(proj.data)
for t in range(n_t):
for i in range(proj.source.n):
for k in range(proj.indptr[i], proj.indptr[i + 1]):
j = proj.indices[k]
grad[k] += recorded_spikes[t][i] * error[t][j]
proj.data -= self.inner_lr * grad / max(n_t, 1)
|
Meta-gradient update across multiple tasks.
tasks : list of tuple
Each element is (inputs, targets).
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332 | def outer_step(self, tasks: list[tuple[np.ndarray[Any, Any], np.ndarray[Any, Any]]]) -> None:
"""Meta-gradient update across multiple tasks.
Parameters
----------
tasks : list of tuple
Each element is (inputs, targets).
"""
meta_grad = [np.zeros_like(proj.data) for proj in self.network.projections]
base_weights = self._snapshot_weights()
for task in tasks:
self._restore_weights(base_weights)
pre_weights = self._snapshot_weights()
self.inner_loop(task)
for idx, proj in enumerate(self.network.projections):
meta_grad[idx] += proj.data - pre_weights[idx]
self._restore_weights(base_weights)
for idx, proj in enumerate(self.network.projections):
proj.data += self.outer_lr * meta_grad[idx] / max(len(tasks), 1)
|
Homeostatic Plasticity (Turrigiano 2008)
sc_neurocore.learning.advanced.HomeostaticPlasticity
Homeostatic synaptic scaling to maintain target firing rate.
Turrigiano 2008. Multiplicatively scales all incoming weights to keep
the population mean rate near target_rate.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 | class HomeostaticPlasticity:
"""Homeostatic synaptic scaling to maintain target firing rate.
Turrigiano 2008. Multiplicatively scales all incoming weights to keep
the population mean rate near target_rate.
"""
def __init__(self, target_rate: float = 10.0, tau: float = 1000.0) -> None:
self.target_rate = target_rate
self.tau = tau
self._rate_estimate: float | None = None
def update(self, population: Any) -> None:
"""Scale weights of all incoming projections to *population*.
Parameters
----------
population : Population
Target population whose rate should be regulated.
"""
current_rate = np.mean(population.voltages > 0.9) * 1000.0
if self._rate_estimate is None:
self._rate_estimate = current_rate
alpha = 1.0 / self.tau
self._rate_estimate += alpha * (current_rate - self._rate_estimate)
if self._rate_estimate <= 0:
return
scale = self.target_rate / self._rate_estimate
scale = np.clip(scale, 0.9, 1.1)
for proj in getattr(population, "_projections", []):
if hasattr(proj, "data"):
proj.data *= scale
self._last_scale = float(scale)
|
update(population)
Scale weights of all incoming projections to population.
Parameters
population : Population
Target population whose rate should be regulated.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 | def update(self, population: Any) -> None:
"""Scale weights of all incoming projections to *population*.
Parameters
----------
population : Population
Target population whose rate should be regulated.
"""
current_rate = np.mean(population.voltages > 0.9) * 1000.0
if self._rate_estimate is None:
self._rate_estimate = current_rate
alpha = 1.0 / self.tau
self._rate_estimate += alpha * (current_rate - self._rate_estimate)
if self._rate_estimate <= 0:
return
scale = self.target_rate / self._rate_estimate
scale = np.clip(scale, 0.9, 1.1)
for proj in getattr(population, "_projections", []):
if hasattr(proj, "data"):
proj.data *= scale
self._last_scale = float(scale)
|
Short-Term Plasticity (Tsodyks-Markram 1997)
sc_neurocore.learning.advanced.ShortTermPlasticity
Tsodyks-Markram short-term plasticity (STP).
Tsodyks & Markram 1997. Models depression (tau_d) and facilitation (tau_f)
with use parameter u_se.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412 | class ShortTermPlasticity:
"""Tsodyks-Markram short-term plasticity (STP).
Tsodyks & Markram 1997. Models depression (tau_d) and facilitation (tau_f)
with use parameter u_se.
"""
def __init__(self, tau_d: float = 200.0, tau_f: float = 600.0, u_se: float = 0.2) -> None:
self.tau_d = tau_d
self.tau_f = tau_f
self.u_se = u_se
self._x: np.ndarray[Any, Any] | None = None
self._u: np.ndarray[Any, Any] | None = None
def update(self, pre_spikes: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
"""Compute effective weight scaling given pre-synaptic spikes.
Parameters
----------
pre_spikes : np.ndarray[Any, Any]
Binary (0/1) vector of length n_pre.
Returns
-------
np.ndarray[Any, Any]
Effective weight multiplier per pre-synaptic neuron.
"""
n = pre_spikes.shape[0]
if self._x is None:
self._x = np.ones(n)
self._u = np.full(n, self.u_se)
assert self._x is not None and self._u is not None
dt = 1.0
self._x += dt / self.tau_d * (1.0 - self._x)
self._u += dt / self.tau_f * (self.u_se - self._u)
mask = pre_spikes.astype(bool)
self._u[mask] += self.u_se * (1.0 - self._u[mask])
release: np.ndarray[Any, Any] = self._u * self._x
self._x[mask] -= release[mask]
return release
|
update(pre_spikes)
Compute effective weight scaling given pre-synaptic spikes.
Parameters
pre_spikes : np.ndarray[Any, Any]
Binary (0/1) vector of length n_pre.
Returns
np.ndarray[Any, Any]
Effective weight multiplier per pre-synaptic neuron.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412 | def update(self, pre_spikes: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
"""Compute effective weight scaling given pre-synaptic spikes.
Parameters
----------
pre_spikes : np.ndarray[Any, Any]
Binary (0/1) vector of length n_pre.
Returns
-------
np.ndarray[Any, Any]
Effective weight multiplier per pre-synaptic neuron.
"""
n = pre_spikes.shape[0]
if self._x is None:
self._x = np.ones(n)
self._u = np.full(n, self.u_se)
assert self._x is not None and self._u is not None
dt = 1.0
self._x += dt / self.tau_d * (1.0 - self._x)
self._u += dt / self.tau_f * (self.u_se - self._u)
mask = pre_spikes.astype(bool)
self._u[mask] += self.u_se * (1.0 - self._u[mask])
release: np.ndarray[Any, Any] = self._u * self._x
self._x[mask] -= release[mask]
return release
|
Structural Plasticity
sc_neurocore.learning.advanced.StructuralPlasticity
Activity-dependent synapse creation and elimination.
Grows new synapses between correlated neurons and prunes weak ones.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445 | class StructuralPlasticity:
"""Activity-dependent synapse creation and elimination.
Grows new synapses between correlated neurons and prunes weak ones.
"""
def __init__(self, growth_rate: float = 0.001, prune_threshold: float = 0.01) -> None:
self.growth_rate = growth_rate
self.prune_threshold = prune_threshold
def update(self, projection: Any) -> None:
"""Grow or prune synapses in a Projection based on activity.
Parameters
----------
projection : Projection
Target projection to modify.
"""
prune_mask = np.abs(projection.data) < self.prune_threshold
projection.data[prune_mask] = 0.0
n_src = projection.source.n
n_pruned = int(prune_mask.sum())
n_grow = min(n_pruned, max(1, int(self.growth_rate * len(projection.data))))
if n_grow > 0:
zero_indices = np.where(projection.data == 0.0)[0]
if zero_indices.size > 0:
chosen = np.random.choice(
zero_indices, size=min(n_grow, zero_indices.size), replace=False
)
projection.data[chosen] = np.random.uniform(0.001, 0.05, size=chosen.size)
|
update(projection)
Grow or prune synapses in a Projection based on activity.
Parameters
projection : Projection
Target projection to modify.
Source code in src/sc_neurocore/learning/advanced.py
| Python |
|---|
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445 | def update(self, projection: Any) -> None:
"""Grow or prune synapses in a Projection based on activity.
Parameters
----------
projection : Projection
Target projection to modify.
"""
prune_mask = np.abs(projection.data) < self.prune_threshold
projection.data[prune_mask] = 0.0
n_src = projection.source.n
n_pruned = int(prune_mask.sum())
n_grow = min(n_pruned, max(1, int(self.growth_rate * len(projection.data))))
if n_grow > 0:
zero_indices = np.where(projection.data == 0.0)[0]
if zero_indices.size > 0:
chosen = np.random.choice(
zero_indices, size=min(n_grow, zero_indices.size), replace=False
)
projection.data[chosen] = np.random.uniform(0.001, 0.05, size=chosen.size)
|
Federated
sc_neurocore.learning.federated
Privacy-preserving federated aggregation over stochastic-computing bitstreams.
FederatedAggregator
Privacy-preserving federated learning using SC bitstreams.
Source code in src/sc_neurocore/learning/federated.py
| Python |
|---|
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 | class FederatedAggregator:
"""Privacy-preserving federated learning using SC bitstreams."""
@staticmethod
def aggregate_gradients(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
"""Aggregate gradient bitstreams from multiple clients by majority vote.
Parameters
----------
client_gradients : list of numpy.ndarray
Per-client bitstream arrays; all must share the same shape.
Returns
-------
numpy.ndarray
The majority-voted aggregated bitstream.
"""
if not client_gradients:
raise ValueError("No gradients to aggregate")
# Stack: (Num_Clients, Gradient_Size)
stack = np.stack(client_gradients, axis=0)
# Sum bits at each position across clients
# (Client1_bit_i + Client2_bit_i + ... )
sums = np.sum(stack, axis=0)
# Majority Vote
# If sum > num_clients / 2, output 1
threshold = len(client_gradients) / 2.0
aggregated: np.ndarray[Any, Any] = (sums > threshold).astype(np.uint8)
return aggregated
@staticmethod
def secure_sum_protocol(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
"""Sum client bitstreams as a secure-aggregation surrogate.
Models a secure aggregation where the server observes only the
element-wise sum, not individual client updates, analogous to secure
multi-party computation.
"""
# In SC, 'Summing' bitstreams usually produces an integer result (0..N).
# This is strictly not a bitstream anymore but a discretized value.
stack = np.stack(client_gradients, axis=0)
summed: np.ndarray[Any, Any] = np.sum(stack, axis=0)
return summed
|
aggregate_gradients(client_gradients)
staticmethod
Aggregate gradient bitstreams from multiple clients by majority vote.
Parameters
client_gradients : list of numpy.ndarray
Per-client bitstream arrays; all must share the same shape.
Returns
numpy.ndarray
The majority-voted aggregated bitstream.
Source code in src/sc_neurocore/learning/federated.py
| Python |
|---|
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | @staticmethod
def aggregate_gradients(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
"""Aggregate gradient bitstreams from multiple clients by majority vote.
Parameters
----------
client_gradients : list of numpy.ndarray
Per-client bitstream arrays; all must share the same shape.
Returns
-------
numpy.ndarray
The majority-voted aggregated bitstream.
"""
if not client_gradients:
raise ValueError("No gradients to aggregate")
# Stack: (Num_Clients, Gradient_Size)
stack = np.stack(client_gradients, axis=0)
# Sum bits at each position across clients
# (Client1_bit_i + Client2_bit_i + ... )
sums = np.sum(stack, axis=0)
# Majority Vote
# If sum > num_clients / 2, output 1
threshold = len(client_gradients) / 2.0
aggregated: np.ndarray[Any, Any] = (sums > threshold).astype(np.uint8)
return aggregated
|
secure_sum_protocol(client_gradients)
staticmethod
Sum client bitstreams as a secure-aggregation surrogate.
Models a secure aggregation where the server observes only the
element-wise sum, not individual client updates, analogous to secure
multi-party computation.
Source code in src/sc_neurocore/learning/federated.py
| Python |
|---|
53
54
55
56
57
58
59
60
61
62
63
64
65 | @staticmethod
def secure_sum_protocol(client_gradients: list[np.ndarray[Any, Any]]) -> np.ndarray[Any, Any]:
"""Sum client bitstreams as a secure-aggregation surrogate.
Models a secure aggregation where the server observes only the
element-wise sum, not individual client updates, analogous to secure
multi-party computation.
"""
# In SC, 'Summing' bitstreams usually produces an integer result (0..N).
# This is strictly not a bitstream anymore but a discretized value.
stack = np.stack(client_gradients, axis=0)
summed: np.ndarray[Any, Any] = np.sum(stack, axis=0)
return summed
|
Lifelong (EWC)
Elastic Weight Consolidation with active penalty: pushes drifted weights
back toward consolidated values, weighted by Fisher information.
sc_neurocore.learning.lifelong
EWC_SCLayer
dataclass
Bases: SCLearningLayer
Lifelong Learning Layer using Elastic Weight Consolidation (Approx).
Source code in src/sc_neurocore/learning/lifelong.py
| Python |
|---|
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 | @dataclass
class EWC_SCLayer(SCLearningLayer):
"""
Lifelong Learning Layer using Elastic Weight Consolidation (Approx).
"""
ewc_lambda: float = 10.0 # Strength of constraint
def __post_init__(self) -> None:
super().__post_init__()
self.fisher_info = np.zeros((self.n_neurons, self.n_inputs))
self.star_weights = np.zeros((self.n_neurons, self.n_inputs))
def consolidate_task(self) -> None:
"""
Call after finishing a task.
Calculate Fisher Info (Importance) and freeze 'star' weights.
"""
# In SC, Fisher Info approx ~ Activity * Plasticity
# Weights that changed a lot or are high are often important.
# Simplified: Importance = Current Weight Magnitude (Hebbian)
current_w = self.get_weights()
self.star_weights = current_w.copy()
# Assume all non-zero weights are somewhat important
self.fisher_info = current_w.copy()
def apply_ewc_penalty(self, step_size: float = 0.01) -> float:
"""Push weights back toward consolidated values, weighted by Fisher info.
Kirkpatrick et al. 2017, adapted to SC/STDP setting.
Penalty gradient per synapse: F_i * (w_i - w_star_i).
Parameters
----------
step_size : float
Fraction of penalty gradient to apply per call.
Returns
-------
float
Total penalty magnitude (for logging).
"""
current_w = self.get_weights()
delta = current_w - self.star_weights
penalty_grad = self.fisher_info * delta
correction = self.ewc_lambda * step_size * penalty_grad
new_w = np.clip(current_w - correction, self.w_min, self.w_max)
for i in range(self.n_neurons):
for j in range(self.n_inputs):
self.synapses[i][j].w = float(new_w[i, j])
return float(np.sum(np.abs(penalty_grad)))
|
consolidate_task()
Call after finishing a task.
Calculate Fisher Info (Importance) and freeze 'star' weights.
Source code in src/sc_neurocore/learning/lifelong.py
| Python |
|---|
27
28
29
30
31
32
33
34
35
36
37
38
39 | def consolidate_task(self) -> None:
"""
Call after finishing a task.
Calculate Fisher Info (Importance) and freeze 'star' weights.
"""
# In SC, Fisher Info approx ~ Activity * Plasticity
# Weights that changed a lot or are high are often important.
# Simplified: Importance = Current Weight Magnitude (Hebbian)
current_w = self.get_weights()
self.star_weights = current_w.copy()
# Assume all non-zero weights are somewhat important
self.fisher_info = current_w.copy()
|
apply_ewc_penalty(step_size=0.01)
Push weights back toward consolidated values, weighted by Fisher info.
Kirkpatrick et al. 2017, adapted to SC/STDP setting.
Penalty gradient per synapse: F_i * (w_i - w_star_i).
Parameters
step_size : float
Fraction of penalty gradient to apply per call.
Returns
float
Total penalty magnitude (for logging).
Source code in src/sc_neurocore/learning/lifelong.py
| Python |
|---|
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 | def apply_ewc_penalty(self, step_size: float = 0.01) -> float:
"""Push weights back toward consolidated values, weighted by Fisher info.
Kirkpatrick et al. 2017, adapted to SC/STDP setting.
Penalty gradient per synapse: F_i * (w_i - w_star_i).
Parameters
----------
step_size : float
Fraction of penalty gradient to apply per call.
Returns
-------
float
Total penalty magnitude (for logging).
"""
current_w = self.get_weights()
delta = current_w - self.star_weights
penalty_grad = self.fisher_info * delta
correction = self.ewc_lambda * step_size * penalty_grad
new_w = np.clip(current_w - correction, self.w_min, self.w_max)
for i in range(self.n_neurons):
for j in range(self.n_inputs):
self.synapses[i][j].w = float(new_w[i, j])
return float(np.sum(np.abs(penalty_grad)))
|
Neuroevolution
sc_neurocore.learning.neuroevolution
Genetic algorithm for evolving spiking-neural-network weights and parameters.
SNNGeneticEvolver
dataclass
Genetic algorithm for evolving SNN weights and parameters.
Source code in src/sc_neurocore/learning/neuroevolution.py
| Python |
|---|
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 | @dataclass
class SNNGeneticEvolver:
"""Genetic algorithm for evolving SNN weights and parameters."""
population_size: int = 20
mutation_rate: float = 0.05
elite_fraction: float = 0.2
def __init__(self, layer_factory: Callable[[], Any], fitness_func: Callable[[Any], float]):
self.layer_factory = layer_factory
self.fitness_func = fitness_func
# Initialize population
self.population = [layer_factory() for _ in range(self.population_size)]
def evolve(self, generations: int) -> Any:
"""Run the GA for the given number of generations and return the best individual."""
for gen in range(generations):
# 1. Evaluate Fitness
scores = [self.fitness_func(ind) for ind in self.population]
# Sort by fitness (descending)
ranked_indices = np.argsort(scores)[::-1]
ranked_pop = [self.population[i] for i in ranked_indices]
logger.info("Gen %d: Best Fitness = %.4f", gen, scores[ranked_indices[0]])
# 2. Selection (Elitism)
n_elite = int(self.population_size * self.elite_fraction)
next_gen = ranked_pop[:n_elite]
# 3. Crossover & Mutation
while len(next_gen) < self.population_size:
# Simple random selection for parents
p1, p2 = np.random.choice(ranked_pop[: n_elite + 5], 2, replace=False)
child = self._crossover(p1, p2)
self._mutate(child)
next_gen.append(child)
self.population = next_gen
return self.population[0] # Return best
def _crossover(self, p1: Any, p2: Any) -> Any:
# Create new instance
child = self.layer_factory()
if not hasattr(p1, "weights"):
return child
# Uniform crossover
mask = np.random.rand(*p1.weights.shape) > 0.5
child.weights = np.where(mask, p1.weights, p2.weights)
return child
def _mutate(self, ind: Any) -> None:
if not hasattr(ind, "weights"):
return
# Gaussian mutation
mutation_mask = np.random.rand(*ind.weights.shape) < self.mutation_rate
noise = np.random.normal(0, 0.1, ind.weights.shape)
ind.weights[mutation_mask] += noise[mutation_mask]
ind.weights = np.clip(ind.weights, 0, 1)
|
evolve(generations)
Run the GA for the given number of generations and return the best individual.
Source code in src/sc_neurocore/learning/neuroevolution.py
| Python |
|---|
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 | def evolve(self, generations: int) -> Any:
"""Run the GA for the given number of generations and return the best individual."""
for gen in range(generations):
# 1. Evaluate Fitness
scores = [self.fitness_func(ind) for ind in self.population]
# Sort by fitness (descending)
ranked_indices = np.argsort(scores)[::-1]
ranked_pop = [self.population[i] for i in ranked_indices]
logger.info("Gen %d: Best Fitness = %.4f", gen, scores[ranked_indices[0]])
# 2. Selection (Elitism)
n_elite = int(self.population_size * self.elite_fraction)
next_gen = ranked_pop[:n_elite]
# 3. Crossover & Mutation
while len(next_gen) < self.population_size:
# Simple random selection for parents
p1, p2 = np.random.choice(ranked_pop[: n_elite + 5], 2, replace=False)
child = self._crossover(p1, p2)
self._mutate(child)
next_gen.append(child)
self.population = next_gen
return self.population[0] # Return best
|