Skip to content

Network Simulation Engine

Module: sc_neurocore.network (re-exported from sc_neurocore.network.__init__) Source: src/sc_neurocore/network/ — 12 files, 4065 LOC Status (v3.14.0): core orchestrator + Population/Projection/topology/monitors fully wired; Rust network dispatch and MPI per-rank Rust dispatch are implemented, with Python fallback when the engine wheel is absent; MPIRunner has 12 mocked-mpi4py tests; topology/projection compute paths are still pure-Python (no Rust path yet).

This page covers the simulation engine — the declarative Network container, populations of neurons, sparse Projection connectivity (with delays + STDP), connectivity generators, spike/state/rate monitors, stimulus sources, the multi-backend dispatcher (auto/python/rust/mpi), and the LIF-network Verilog exporter. Two specialised circuit models that ship in the same directory will be documented on their own pages once written (planned: api/cortical_column.md, api/gamma_oscillation.md); both are flagged in §14.1 below for fidelity violations.


1. Overview

The simulation engine is declarative: you instantiate populations and projections, hand them to a Network, and call .run(duration, dt). The network registers each object by type (Population, Projection, SpikeMonitor, StateMonitor, RateMonitor, TimedArray, PoissonInput, StepCurrent) and runs the appropriate per-step pipeline:

Text Only
each timestep t:
  for each population: zero its current accumulator
  apply stimuli → currents
  apply projections (CSR matvec + delay) → currents
  for each population: step neurons, record spikes
  update plasticity (STDP) on projections that have it
  optional: Fisher Information Metric self-observation feedback

Network.run selects one of three backends:

  • 'python' — the pure-Python loop above
  • 'rust' — delegates to sc_neurocore_engine.NetworkRunner (PyO3) when every population's model_name is in NetworkRunner.supported_models() and there are no stimuli and no plasticity
  • 'mpi' — partitions populations round-robin across MPI ranks and exchanges spikes via Allgatherv

'auto' (default) picks 'rust' when its preconditions hold, otherwise falls back to 'python'.


2. Public Surface

The module re-exports 18 public symbols from sc_neurocore.network.__init__:

Symbol Source file Role
Network network.py Top-level orchestrator + dispatcher
Population population.py Vectorised group of identical neurons
Projection projection.py CSR connectivity + delay + STDP
SpikeMonitor monitor.py Records (neuron_id, timestep) pairs
StateMonitor monitor.py Records state-variable traces
RateMonitor monitor.py Population firing rate per bin
TimedArray stimulus.py Time-varying scalar stimulus
PoissonInput stimulus.py Random Poisson spike trains
StepCurrent stimulus.py Rectangular step current
random_connectivity topology.py Erdős–Rényi
small_world topology.py Watts–Strogatz
scale_free topology.py Barabási–Albert
ring_topology topology.py Ring with k-NN
grid_topology topology.py 2-D Manhattan-radius lattice
all_to_all topology.py Dense connectivity
export_verilog export.py Multi-population LIF → Verilog
MPIRunner mpi_runner.py Distributed simulation runner
HAS_MPI mpi_runner.py bool — was mpi4py importable?

The submodule cortical_column exports CorticalColumn; the submodule gamma_oscillation exports PINGCircuit. Both are documented separately.


3. Network — orchestrator

3.1 Constructor

Python
Network(*objects, seed: int = 42, fim_lambda: float = 0.0)

Accepts any number of Population, Projection, monitor or stimulus objects positionally. Each is registered into the corresponding internal list by isinstance dispatch; unknown types raise TypeError.

fim_lambda enables a Fisher Information Metric self-observation feedback (_apply_fim, network.py:249): each timestep the per-source weight is pulled toward the population mean of its source spike vector by λ_fim · (activity_i − μ) / N. The mechanism is derived from scpn-quantum-control NB26-28 (FIM alone synchronises at K=0, λ ≥ 8, increases Φ by 73 %, and is topology-universal). Set to 0.0 (default) to disable.

3.2 run

Python
def run(
    duration: float,
    dt: float = 0.001,
    progress: bool = False,
    backend: str = "auto",
    spike_gating: bool = False,
) -> None
  • duration is in seconds; dt is the timestep in seconds. n_steps is int(round(duration/dt)).
  • backend{"auto", "python", "rust", "mpi"}. "rust" raises if the engine wheel is not importable; "auto" silently falls back to Python.
  • spike_gating (Python backend only) skips neurons with zero input current whose voltage is within 1 % of resting potential. Useful for sparse networks where most neurons are silent.

The Python loop (_run_python, network.py:165) is the reference implementation; the Rust loop (_run_rust, network.py:123) round-trips populations and projections through NetworkRunner.add_population / add_projection, runs n_steps, then decodes packed spike events (u64 = neuron_id<<32 | timestep) back into Python SpikeMonitor records.

3.3 Rust dispatch criteria (_can_use_rust)

Network._can_use_rust (line 80) returns True only when:

  1. len(self.stimuli) == 0 — no TimedArray/PoissonInput/StepCurrent in this network.
  2. The Rust engine import succeeded (_get_rust_engine() is not False).
  3. Every pop.model_name (or its *Neuron-stripped form) is in NetworkRunner.supported_models().
  4. No projection has a non-empty plasticity field.

When any of these fails and backend="auto", the network falls back to Python without warning. Pass backend="python" explicitly when you need deterministic dispatch.

3.4 Backend matrix

Feature Python Rust MPI
Heterogeneous neuron models ✅ any ⚠️ only supported_models() ✅ per-rank
Stimuli ✅ all ❌ disqualifies Rust ✅ rank 0
STDP / plasticity ❌ disqualifies Rust
Per-synapse delays ⚠️ uniform only via add_projection(... max_delay)
Spike gating
FIM feedback (fim_lambda > 0)
Multi-rank

4. Population — vectorised neurons

Python
Population(model: type | str, n: int,
           params: dict | None = None,
           label: str | None = None)

model may be a class or a string name resolved through sc_neurocore.neurons.models (lazy registry of 130 lazy-loaded classes — see api/neurons.md). The constructor instantiates n independent neuron objects with identical parameters and exposes:

  • population.neuronslist[NeuronProtocol] of length n
  • population.voltages — read-only np.ndarray view (kept in sync via _sync_voltages)
  • population.step_all(currents, spike_gating=False) — returns binary spike vector np.ndarray[int8] of length n
  • population.reset_all() — calls reset() or reset_state() on each neuron
  • population.get_states() — collects all per-neuron state variables into arrays (uses get_state(), __dataclass_fields__, or falls back to ["v"])
  • population.set_voltages(arr) — sync voltages from an external source (used by the Rust round-trip)

Populations are hashable by identity (id(pop)); the Network keeps pop_to_currents keyed by id(pop) so two Population objects with identical parameters are still independent.

4.1 Spike gating

When spike_gating=True, step_all skips neurons that have no input current and sit within 1 % of their resting potential. This makes per-step compute roughly proportional to the active neuron count, which matters for sparse networks where most neurons are silent for most of the simulation. The skipped neurons do not advance — when the population is queried later, their voltages are stale until they receive input again. Models that track sub-threshold leak via internal calls to step() lose that leak during skipped steps.


5. Projection — CSR connectivity + delays + STDP

Python
Projection(
    source: Population,
    target: Population,
    weight: float,
    probability: float = 1.0,
    delay: float | np.ndarray = 0.0,
    topology: str | tuple[np.ndarray, np.ndarray, np.ndarray] = "random",
    plasticity: str | None = None,
    seed: int = 42,
    weight_threshold: float = 0.0,
)

Stores connectivity in CSR (indptr, indices, data). Source spikes propagate via _csr_matvec (no delay) or one of two delayed variants:

  • delay = 0.0 — direct CSR matvec, no buffering
  • delay = scalar — uniform axonal delay; output goes through a circular buffer of shape (steps, target.n) and is read out one timestep behind
  • delay = ndarray of length n_synapses — per-synapse delay; source spike history is stored as a ring buffer of shape (max_delay+1, source.n) and each synapse reads from spike_history[(hist_idx − d_k) % max_delay]

Per-synapse delays implement Hammouamri et al. 2023 (DCLS) and Masquelier's DelRec — learnable synaptic delays in spiking nets.

5.1 weight_threshold pruning

When weight_threshold > 0.0, the matvec skips synapses with |data[k]| ≤ weight_threshold. Useful after sparse pruning to avoid wasted multiplications. The branch is in the inner Python loop (projection.py:47) so the speed-up is real but linear in connection count.

5.2 STDP plasticity

plasticity="stdp" activates trace-based STDP in update_plasticity (projection.py:258):

  • Pre/post traces decay with tau (default 20 timesteps), incremented on spike
  • LTD on pre-spike: data[k] -= a_minus * post_trace[j]
  • LTP on post-spike: data[k] += a_plus * directional_bias * pre_trace[i]
  • Weights clipped at 0 (no sign change)

directional_bias scales a_plus per projection. The scpn-quantum-control NB19 measurement of autonomic → cortical asymmetry suggests 1.36 for bottom-up (sensory → higher-order) projections; default 1.0 keeps learning symmetric.

Self-projections (source is target) additionally trigger _enforce_symmetry after each STDP update — W_ij and W_ji are averaged. Required because gradient/STDP updates break W = W^T after ~30 steps (SPO Finding #7); asymmetric coupling hurts synchronisation by +12 % (quantum-control NB24).


6. Topology generators

All generators in topology.py return a (indptr, indices, data) CSR tuple ready to feed into Projection(..., topology=tuple).

Generator Algorithm Cited basis Symmetric?
random_connectivity Erdős–Rényi classic no
small_world Watts–Strogatz Watts & Strogatz 1998 yes (added both directions)
scale_free Barabási–Albert preferential attachment Barabási & Albert 1999 yes
ring_topology Ring + k nearest neighbours both directions yes
grid_topology 2-D lattice within Manhattan radius no (loops add both directions only when within radius)
all_to_all Dense yes when n_src == n_tgt

6.1 Performance (this workstation, 2026-04-17)

Measured directly by calling each generator and timing with time.perf_counter(). All inputs are deterministic seeds.

Generator n args wall (ms) synapses
random_connectivity 200 p=0.1 32.5 3 941
all_to_all 200 11.7 40 000
small_world 200 k=8, p=0.1 1.6 1 600
scale_free 200 m=4 10.0 1 568
ring_topology 200 k=4 0.5 1 600
grid_topology 196 (14²) r=1 <0.01 1 404
random_connectivity 1 000 p=0.1 66.3 99 869
all_to_all 1 000 472.2 1 000 000
small_world 1 000 k=8, p=0.1 26.0 8 000
scale_free 1 000 m=4 67.8 7 968
ring_topology 1 000 k=4 3.8 8 000
grid_topology 961 (31²) r=1 <0.01 7 320

small_world and scale_free use Python lists during construction; for n ≥ 10 000 they become noticeably slow and are candidates for the planned Rust path (task #13). grid_topology and ring_topology already vectorise adequately for typical sizes.


7. Stimulus sources

Class Returns from get_current Notes
TimedArray(values, dt) scalar float at min(t_step, len-1) clamps past the end
PoissonInput(n, rate_hz, weight, dt, seed) np.ndarray[n] (rng.random < rate_hz·dt)·weight
StepCurrent(onset, offset, amplitude) scalar float if onset ≤ t < offset else 0.0 rectangular

Each stimulus carries a target: Population | None slot that the network populates when adding the stimulus to a population (the convenience pattern is to set stim.target = pop after construction). When target is None, the network broadcasts to populations[0].

PoissonInput owns a np.random.default_rng(seed); runs are deterministic when seeds are pinned. TimedArray and StepCurrent are deterministic by construction.


8. Monitors

8.1 SpikeMonitor

Records (neuron_id, timestep) pairs. Two ingestion paths:

  • record(spikes, t) — accepts the binary spike vector emitted by Population.step_all (Python backend)
  • record_event(neuron_id, t) — accepts a single decoded event (used by the Rust round-trip, which packs u64 = neuron_id<<32 | timestep for efficient transport)

Read-out helpers:

Property/method Returns Notes
spike_times np.ndarray[int64] every spike's timestep
spike_trains dict[int, np.ndarray] per-neuron timestep arrays
count int total spikes recorded
raster_data() (times, neuron_ids) tuple ready for raster plot
firing_rates(n_steps, dt) np.ndarray[n] mean Hz per neuron
isi(neuron) np.ndarray[int64] inter-spike intervals (timesteps)
cross_correlation(i, j, max_lag) (corr, lags) delegates to analysis.spike_stats.cross_correlation

8.2 StateMonitor

Captures population.get_states() snapshots at each call to snapshot(t). Configure with variables: list[str] (default ["v"]) and an optional record: list[int] to subset recorded neurons.

8.3 RateMonitor

Bins spike counts into fixed-duration windows (bin_ms), then converts to per-neuron mean rate (Hz). The bin-completion check uses steps_per_bin = max(1, int(bin_ms / 1000.0 / dt)), so a 10 ms bin at dt=1 ms flushes every 10 steps.


9. Verilog export

export_verilog(network, output_dir, target="ice40") -> str emits a top-level sc_network_top SystemVerilog module that wires one sc_lif_array instance per population. Each population's parameters are read from the first neuron in pop.neurons (v_threshold, v_reset, tau) and converted to fixed-point by multiplying by 256 (Q8.8). The allowed neuron model whitelist is _LIF_MODELS (17 LIF variants); any non-LIF model raises SCHardwareError.

The exporter writes two files: - sc_network_top.v — module wrapper with one pop_<i> instance per population - params.vh\define POP__SIZE n` per population

Limitations: - Only supports populations of LIF-family models - Does not emit projections or topology — only neuron arrays - Hard-codes Q8.8 scaling (see cli.md §9.1 for the related dt=0.001 underflow bug) - Not test-covered in tests/test_network_*.py; add coverage when the LIF network export is exercised end-to-end

For full equation → Verilog compilation including state-machine generation and FPGA project files, use sc-neurocore compile (see api/cli.md) or api/compiler.md.


10. MPIRunner (distributed)

mpi_runner.py provides round-robin partitioning of populations across MPI ranks with MPI_Allgatherv spike exchange every timestep:

Text Only
each rank r:
  local_pops = {i for i in range(n_pops) if i % size == r}
  for t in range(n_steps):
    propagate local + cross-rank projections into local_currents
    step local populations → local_spikes
    Allgatherv local_spikes → all_spikes
    if r == 0: feed monitors with all_spikes

mpi_runner.py:79 (_identify_cross_rank_projections) walks every projection and tags it as local (source and target on the same rank) or cross-rank. The implementation works with both Python and Rust per-rank stepping, but currently dispatches via Population.step_all (Python loop) regardless of backend selection.

10.1 Status

  • 191 LOC, including a custom spike packing protocol (pop_index | n | spike_data packed as int8 blob) and Allgather + Allgatherv choreography.
  • tests/test_mpi_runner.py (177 lines, 8 tests) covers MPIRunner via mocked mpi4py — partition correctness, RuntimeError when mpi4py is absent, single-rank end-to-end equivalence with the Python backend, cross-rank vs local projection routing, spike-exchange round-trip. Real multi-rank semantics with mpirun -n 2 are NOT exercised; task #17 tracks adding a pytest-mpi-style real test.
  • Does not implement spike gating, FIM feedback, or per-rank Rust dispatch.

11. Performance — Python backend (this workstation)

Measured via:

Python
net = Network(pop, proj, mon, stim, seed=1)
net.run(duration=0.2, dt=0.001, backend="python")

with 200 timesteps, recurrent random connectivity at p=0.2, Poisson input (500 Hz, w=2.0, seed=11), LapicqueNeuron populations (default parameters: tau=20 ms, threshold=1.0). Hardware: Intel i5-11600K, 32 GB, Python 3.12.3.

Population n Synapses 200-step wall steps / s Recorded spikes
50 489 13.9 ms 14 428 185
200 7 911 56.6 ms 3 532 782
500 49 570 193.8 ms 1 032 2 385
1 000 200 283 854.8 ms 234 7 464

Scaling is roughly O(n_synapses) because the Python _csr_matvec inner loop dominates. step_all walks the population list one neuron at a time (line 78: neuron.step(float(currents[i]))) which adds ~5 µs of Python overhead per neuron per step. For dense recurrent networks at n ≥ 1000 the Python backend rapidly becomes the bottleneck — the Rust backend exists for exactly this case but requires the engine wheel installed (see §13).

11.1 Delay-mode cost (n=200, p=0.2, 200 steps)

Mode max_delay wall (ms)
none 0 114.0
uniform (5 steps) 5 113.1
per_synapse (rand 1–7) 7 637.0

Per-synapse delays are ~5.6× slower than no-delay or uniform-delay because _csr_delayed_matvec walks every synapse without the early-exit if x[i] == 0: continue of _csr_matvec. (_csr_delayed_matvec reads spike history at varying offsets, so it cannot skip rows up front.) Use uniform delay when the biological detail tolerates it.


12. Pipeline wiring

Surface How it's wired Verifier
from sc_neurocore.network import Network, Population, ... __init__.py re-exports 18 symbols tests/test_network_basic.py
Population(model="LapicqueNeuron", n=...) resolves through sc_neurocore.neurons.models._CLASS_TO_MODULE Population._resolve_model (population.py:19)
net.run(backend="rust") imports sc_neurocore_engine.NetworkRunner lazily _get_rust_engine (network.py:25)
net.run(backend="mpi") imports mpi4py.MPI lazily, raises if absent _require_mpi (mpi_runner.py:36)
Projection(plasticity="stdp") activates update_plasticity per timestep tested in test_network_basic.py
export_verilog(net, dir) calls _check_exportable first raises SCHardwareError for non-LIF models

Every public symbol terminates either in tested code or in an explicit runtime check. There are no orphan helpers.


13. Audit (7-point checklist)

# Dimension Status Detail
1 Pipeline wiring ✅ PASS All 18 public symbols wired; backend dispatcher complete
2 Multi-angle tests ⚠️ WARN Network tests cover the orchestrator, monitors, topology, cortical column, gamma circuit, and 12 mocked-mpi4py MPIRunner paths including per-rank Rust dispatch. Real multi-rank coverage is still missing (task #17). export.py is not directly covered.
3 Rust path ⚠️ WARN Network._run_rust and MPIRunner per-rank Rust dispatch exist and are tested logically; engine wheel not installed in this environment so empirical Rust numbers in §11 are not available. topology.py, _csr_matvec/_csr_delayed_matvec, update_plasticity are pure Python — task #13 tracks the Rustification.
4 Benchmarks ✅ PASS §6.1, §11, §11.1 measured this session. benchmarks/sc_network_benchmark.py exists (306 lines) but covers SC pipeline (encode/MAC/decode), not network orchestration — that gap is now filled by §11.
5 Performance docs ✅ PASS §11 + §6.1 + §11.1
6 Documentation page ✅ PASS This page
7 Rules followed ⚠️ WARN SPDX headers on every source file ✅. gamma_oscillation.py:66-67 has # type: ignore[arg-type] without rationale (mirrors cli.py:298). British English in docstrings is mixed (vectorized, synchronization appear); see §14.

Net: 3 WARN, 0 FAIL. Tracked follow-ups: tasks #10–#13.


14. Known issues & follow-ups

14.1 Two model fidelity violations (CRITICAL)

CorticalColumn and PINGCircuit ship in this directory but simplify their cited publications in ways that break the no-simplifications rule:

  • CorticalColumn cites Douglas & Martin 2004 + Potjans & Diesmann 2014; implements 5 of 8 populations (no L4i/L5i/L6i), 7 of 64 connections from the Binzegger matrix, no PSP kernel, no Poisson background input. Tracked: task #10.
  • PINGCircuit cites Whittington et al. 1995 + Börgers & Kopell 2003; implements a mean-field rate approximation (population firing rate × scalar weight) instead of the spiking conductance-based PING with α-function synapses. Tracked: task #11.

Both will be documented in detail on their own pages once written (api/cortical_column.md, api/gamma_oscillation.md); the audit findings above are the canonical source until then.

14.2 Rustification gap

Topology generators and projection matvec / STDP are pure-Python loops. For n ≥ 1000 dense or any per-synapse-delay setup, this is the dominant cost. The Rust engine has NetworkRunner for the network loop but no counterpart for the topology generators, and the add_projection call takes a Python-side CSR tuple. Task #13 tracks closing this gap.

14.3 MPIRunner real multi-rank coverage missing

tests/test_mpi_runner.py covers 12 paths via mocked mpi4py, including the NetworkRunner.step_population per-rank Rust dispatch contract. The custom spike-packing protocol and Allgatherv choreography are not exercised against real mpi4py + mpirun -n 2; a regression in real-MPI buffer ordering or datatype matching would not be caught. Task #17 tracks adding a pytest-mpi-style real test.

14.4 American spellings in source docstrings

Docstrings in network.py, population.py, projection.py use vectorized, synchronization, optimize etc. — should be British per SHARED_CONTEXT.md. Not blocking; future cleanup.

14.5 # type: ignore[arg-type] without rationale

gamma_oscillation.py:66-67 (dataclass field defaults). Mirror of cli.py:298. Should either type-correctly or annotate the reason.


15. Tests

Bash
PYTHONPATH=src python3 -m pytest \
    tests/test_network_basic.py \
    tests/test_network_coverage.py \
    tests/test_network_monitors_stimulus.py \
    tests/test_cortical_column.py \
    tests/test_cortical_column_dynamics.py \
    tests/test_gamma_oscillation.py \
    tests/test_topology.py \
    tests/test_topology_generators.py -q
# 87 passed in 2.26s  (verified 2026-04-17)

What the existing tests cover:

  • test_network_basic.py — Network construction, add(), run() with each backend dispatch path, all monitor types, stimuli, plasticity flag
  • test_network_coverage.py — edge cases on _can_use_rust, FIM feedback, spike gating, _apply_plasticity/_apply_fim correctness
  • test_network_monitors_stimulus.py — Spike/State/Rate monitor determinism, Poisson seed reproducibility, TimedArray clamping
  • test_topology.py, test_topology_generators.py — every generator's output shape, symmetry where claimed, deterministic seeding
  • test_cortical_column*.pyCorticalColumn dynamics smoke tests (does NOT verify Potjans/Binzegger fidelity — see task #10)
  • test_gamma_oscillation.pyPINGCircuit smoke tests (does NOT verify spectral peak in 30–80 Hz band — see task #11)

What the existing tests do not cover:

  • MPIRunner real multi-rank semantics — 12 mocked-mpi4py tests exist; real mpirun -n 2 coverage missing (task #17)
  • export_verilog — no direct test; covered transitively by FPGA flow smoke tests at most
  • Performance regressions — no pytest-benchmark cases for the network loop; §11 numbers are point measurements

16. References

Network simulation engineering:

  • Brette R., Rudolph M. et al. "Simulation of networks of spiking neurons: a review of tools and strategies." J Comp Neurosci 23:349-398 (2007).
  • Eppler J. M. et al. "PyNEST: A convenient interface to the NEST simulator." Front Neuroinform 2:12 (2008).

Connectivity models:

  • Watts D. J., Strogatz S. H. "Collective dynamics of small-world networks." Nature 393:440-442 (1998).
  • Barabási A.-L., Albert R. "Emergence of scaling in random networks." Science 286:509-512 (1999).
  • Erdős P., Rényi A. "On random graphs." Publicationes Mathematicae 6:290-297 (1959).

Synaptic delays / plasticity:

  • Hammouamri I. et al. "Learning delays in spiking neural networks using dilated convolutions with learnable spacings." NeurIPS (2023).
  • Bi G., Poo M. "Synaptic modifications in cultured hippocampal neurons: dependence on spike timing, synaptic strength, and postsynaptic cell type." J Neurosci 18:10464-10472 (1998).

MPI:

  • Message Passing Interface Forum. MPI: A Message-Passing Interface Standard, Version 4.0 (2021).

Internal:


17. Auto-rendered API

sc_neurocore.network

Declarative network simulation engine for SC-NeuroCore.

HAS_MPI = True module-attribute

Network

Declarative network: collects objects, runs the simulation loop.

Source code in src/sc_neurocore/network/network.py
Python
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
class Network:
    """Declarative network: collects objects, runs the simulation loop."""

    def __init__(self, *objects: Any, seed: int = 42, fim_lambda: float = 0.0) -> None:
        self.populations: list[Population] = []
        self.projections: list[Projection] = []
        self.spike_monitors: list[SpikeMonitor] = []
        self.state_monitors: list[StateMonitor] = []
        self.rate_monitors: list[RateMonitor] = []
        self.stimuli: list[TimedArray | PoissonInput | StepCurrent] = []
        self.seed = seed
        self.fim_lambda = fim_lambda
        self._spike_gating = False
        for obj in objects:
            self.add(obj)

    def add(self, obj: Any) -> None:
        """Register a simulation object by type."""
        if isinstance(obj, Population):
            self.populations.append(obj)
        elif isinstance(obj, Projection):
            self.projections.append(obj)
        elif isinstance(obj, SpikeMonitor):
            self.spike_monitors.append(obj)
        elif isinstance(obj, StateMonitor):
            self.state_monitors.append(obj)
        elif isinstance(obj, RateMonitor):
            self.rate_monitors.append(obj)
        elif isinstance(obj, (TimedArray, PoissonInput, StepCurrent)):
            self.stimuli.append(obj)
        else:
            raise TypeError(f"Unknown object type: {type(obj).__name__}")

    def _can_use_rust(self) -> bool:
        if self.stimuli:
            return False
        if _get_rust_engine() is False:
            return False
        for pop in self.populations:
            if not _rust_supports_model(pop.model_name):
                return False
        return not any(p.plasticity for p in self.projections)

    def run(
        self,
        duration: float,
        dt: float = 0.001,
        progress: bool = False,
        backend: str = "auto",
        spike_gating: bool = False,
    ) -> None:
        """Run the simulation for *duration* seconds at timestep *dt*.

        *backend* selects execution: ``'auto'`` picks Rust when available
        and all models are supported, ``'rust'`` forces the Rust backend
        (raises if unavailable), ``'python'`` forces pure-Python,
        ``'mpi'`` runs MPI-distributed (requires mpi4py).

        *spike_gating*: skip neurons with zero input and near-rest voltage.
        Makes compute roughly proportional to active neuron count. Python
        backend only.
        """
        self._spike_gating = spike_gating
        if backend == "mpi":
            return self._run_mpi(duration, dt)
        if backend == "rust" or (backend == "auto" and self._can_use_rust()):
            return self._run_rust(duration, dt)
        return self._run_python(duration, dt, progress)

    def _run_mpi(self, duration: float, dt: float) -> None:
        # MPIRunner does not honour spike_gating or fim_lambda; refuse
        # rather than silently producing wrong results.
        if self._spike_gating:
            raise NotImplementedError(
                "spike_gating is not supported by the MPI backend; "
                "use backend='python' or rebuild without spike_gating"
            )
        if self.fim_lambda > 0:
            raise NotImplementedError(
                "fim_lambda > 0 (FIM feedback) is not supported by the MPI backend; "
                "use backend='python'"
            )
        if self.stimuli:
            raise NotImplementedError(
                "embedded stimuli are not supported by the MPI backend; "
                "use backend='python' or provide rank-local currents externally"
            )
        if self.state_monitors:
            raise NotImplementedError(
                "state monitors are not supported by the MPI backend; use backend='python'"
            )
        if any(proj.plasticity for proj in self.projections):
            raise NotImplementedError(
                "synaptic plasticity is not supported by the MPI backend; use backend='python'"
            )

        from .mpi_runner import MPIRunner

        n_steps = int(round(duration / dt))
        runner = MPIRunner(self)
        runner.run(n_steps, dt)

    def _run_rust(self, duration: float, dt: float) -> None:
        engine_cls = _get_rust_engine()
        if engine_cls is False:
            raise RuntimeError("Rust engine not available")

        runner = engine_cls()
        pop_indices = {}
        for pop in self.populations:
            idx = runner.add_population(pop.model_name, pop.n)
            pop_indices[id(pop)] = idx

        for proj in self.projections:
            src_idx = pop_indices[id(proj.source)]
            tgt_idx = pop_indices[id(proj.target)]
            runner.add_projection(
                src_idx,
                tgt_idx,
                proj.indptr.tolist(),
                proj.indices.tolist(),
                proj.data.tolist(),
                proj.max_delay,  # type: ignore[attr-defined]
            )

        n_steps = int(round(duration / dt))
        results = runner.run(n_steps)

        for i, pop in enumerate(self.populations):
            # Sync voltages back from Rust
            if "voltages" in results and i < len(results["voltages"]):
                rust_v = results["voltages"][i]
                if len(rust_v) == pop.n:
                    pop.set_voltages(rust_v)

            # Decode spike events (u64: neuron_id << 32 | timestep)
            spike_arr = results["spike_data"][i]
            for mon in self.spike_monitors:
                if mon.population is pop:
                    for packed in spike_arr:
                        nid = int(packed >> 32)
                        t = int(packed & 0xFFFFFFFF)
                        mon.record_event(nid, t)

    def _run_python(self, duration: float, dt: float, progress: bool = False) -> None:
        self._rng = np.random.default_rng(self.seed)
        n_steps = int(round(duration / dt))

        pop_to_currents = {id(p): np.zeros(p.n, dtype=np.float64) for p in self.populations}
        last_spikes = {id(p): np.zeros(p.n, dtype=np.int8) for p in self.populations}
        report_interval = max(1, n_steps // 10) if progress else 0

        for t in range(n_steps):
            if report_interval and t % report_interval == 0:
                pct = int(100 * t / n_steps)
                sys.stdout.write(f"\r[{pct:3d}%] step {t}/{n_steps}")
                sys.stdout.flush()

            for pid in pop_to_currents:
                pop_to_currents[pid][:] = 0.0

            self._apply_stimuli(pop_to_currents, t, dt)
            self._apply_projections(pop_to_currents, last_spikes)

            for pop in self.populations:
                pid = id(pop)
                spikes = pop.step_all(pop_to_currents[pid], spike_gating=self._spike_gating)
                last_spikes[pid] = spikes
                self._record(pop, spikes, t, dt)

            self._update_plasticity(last_spikes)
            if self.fim_lambda > 0:
                self._apply_fim(last_spikes)

        if report_interval:
            sys.stdout.write(f"\r[100%] step {n_steps}/{n_steps}\n")
            sys.stdout.flush()

    def _apply_stimuli(self, pop_to_currents: dict[int, np.ndarray], t: int, dt: float) -> None:
        """Inject stimulus currents into target populations."""
        for stim in self.stimuli:
            target = stim.target
            if target is None:
                if self.populations:
                    target = self.populations[0]
                else:
                    continue
            pid = id(target)
            if pid not in pop_to_currents:
                continue
            if isinstance(stim, PoissonInput):
                pop_to_currents[pid][: stim.n] += stim.get_current(t, dt=dt)
            elif isinstance(stim, TimedArray):
                pop_to_currents[pid] += stim.get_current(t)
            elif isinstance(stim, StepCurrent):
                pop_to_currents[pid] += stim.get_current(t, dt)

    def _apply_projections(
        self, pop_to_currents: dict[int, np.ndarray], last_spikes: dict[int, np.ndarray]
    ) -> None:
        """Propagate spikes through all projections."""
        for proj in self.projections:
            src_spikes = last_spikes.get(id(proj.source), np.zeros(proj.source.n, dtype=np.int8))
            current = proj.propagate(src_spikes)
            pid = id(proj.target)
            if pid in pop_to_currents:
                pop_to_currents[pid] += current

    def _record(self, pop: Population, spikes: np.ndarray, t: int, dt: float) -> None:
        """Feed spikes/states to all monitors attached to this population."""
        for sp_mon in self.spike_monitors:
            if sp_mon.population is pop:
                sp_mon.record(spikes, t)
        for st_mon in self.state_monitors:
            if st_mon.population is pop:
                st_mon.snapshot(t)
        for rt_mon in self.rate_monitors:
            if rt_mon.population is pop:
                rt_mon.record(spikes, t, dt)

    def _update_plasticity(self, last_spikes: dict[int, np.ndarray]) -> None:
        """Apply plasticity rules to projections that have them."""
        for proj in self.projections:
            if proj.plasticity:
                src_sp = last_spikes.get(id(proj.source), np.zeros(proj.source.n, dtype=np.int8))
                tgt_sp = last_spikes.get(id(proj.target), np.zeros(proj.target.n, dtype=np.int8))
                proj.update_plasticity(src_sp, tgt_sp)

    def _apply_fim(self, last_spikes: dict[int, np.ndarray]) -> None:
        """Fisher Information Metric self-observation feedback.

        Pulls each neuron's activity toward the population mean,
        creating a global synchronisation force analogous to the FIM
        gradient in the Kuramoto model.

        ΔW_ij -= λ_fim · (activity_i - μ) / N

        Derived from scpn-quantum-control NB26-28: FIM alone synchronises
        (K=0, λ≥8), increases Φ by 73%, and is topology-universal.
        """
        lam = self.fim_lambda
        for proj in self.projections:
            src_sp = last_spikes.get(id(proj.source), np.zeros(proj.source.n))
            n_src = proj.source.n
            mu = float(np.mean(src_sp))
            deviation = src_sp.astype(np.float64) - mu
            for i in range(n_src):
                if deviation[i] == 0:
                    continue
                correction = lam * deviation[i] / n_src
                for k in range(proj.indptr[i], proj.indptr[i + 1]):
                    proj.data[k] = max(0.0, proj.data[k] - correction)

    def to_torch(
        self,
        surrogate_fn: Any | None = None,
    ) -> Any:
        """Build an explicit differentiable bridge without altering NumPy/Rust execution.

        The returned module accepts an input current tensor of shape
        ``(T, batch, input_dim)`` and runs the graph with the same
        previous-spike projection semantics used by the NumPy backend.
        """
        if self.stimuli:
            raise NotImplementedError(
                "Network.to_torch() does not support embedded stimuli; pass input currents "
                "through the returned module instead"
            )
        from ._torch_bridge import NetworkTorchBridge

        if surrogate_fn is None:
            from sc_neurocore.training.surrogate import atan_surrogate_custom_op

            surrogate_fn = atan_surrogate_custom_op

        return NetworkTorchBridge(
            self.populations,
            self.projections,
            surrogate_fn=surrogate_fn,
        )

add(obj)

Register a simulation object by type.

Source code in src/sc_neurocore/network/network.py
Python
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def add(self, obj: Any) -> None:
    """Register a simulation object by type."""
    if isinstance(obj, Population):
        self.populations.append(obj)
    elif isinstance(obj, Projection):
        self.projections.append(obj)
    elif isinstance(obj, SpikeMonitor):
        self.spike_monitors.append(obj)
    elif isinstance(obj, StateMonitor):
        self.state_monitors.append(obj)
    elif isinstance(obj, RateMonitor):
        self.rate_monitors.append(obj)
    elif isinstance(obj, (TimedArray, PoissonInput, StepCurrent)):
        self.stimuli.append(obj)
    else:
        raise TypeError(f"Unknown object type: {type(obj).__name__}")

run(duration, dt=0.001, progress=False, backend='auto', spike_gating=False)

Run the simulation for duration seconds at timestep dt.

backend selects execution: 'auto' picks Rust when available and all models are supported, 'rust' forces the Rust backend (raises if unavailable), 'python' forces pure-Python, 'mpi' runs MPI-distributed (requires mpi4py).

spike_gating: skip neurons with zero input and near-rest voltage. Makes compute roughly proportional to active neuron count. Python backend only.

Source code in src/sc_neurocore/network/network.py
Python
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def run(
    self,
    duration: float,
    dt: float = 0.001,
    progress: bool = False,
    backend: str = "auto",
    spike_gating: bool = False,
) -> None:
    """Run the simulation for *duration* seconds at timestep *dt*.

    *backend* selects execution: ``'auto'`` picks Rust when available
    and all models are supported, ``'rust'`` forces the Rust backend
    (raises if unavailable), ``'python'`` forces pure-Python,
    ``'mpi'`` runs MPI-distributed (requires mpi4py).

    *spike_gating*: skip neurons with zero input and near-rest voltage.
    Makes compute roughly proportional to active neuron count. Python
    backend only.
    """
    self._spike_gating = spike_gating
    if backend == "mpi":
        return self._run_mpi(duration, dt)
    if backend == "rust" or (backend == "auto" and self._can_use_rust()):
        return self._run_rust(duration, dt)
    return self._run_python(duration, dt, progress)

to_torch(surrogate_fn=None)

Build an explicit differentiable bridge without altering NumPy/Rust execution.

The returned module accepts an input current tensor of shape (T, batch, input_dim) and runs the graph with the same previous-spike projection semantics used by the NumPy backend.

Source code in src/sc_neurocore/network/network.py
Python
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def to_torch(
    self,
    surrogate_fn: Any | None = None,
) -> Any:
    """Build an explicit differentiable bridge without altering NumPy/Rust execution.

    The returned module accepts an input current tensor of shape
    ``(T, batch, input_dim)`` and runs the graph with the same
    previous-spike projection semantics used by the NumPy backend.
    """
    if self.stimuli:
        raise NotImplementedError(
            "Network.to_torch() does not support embedded stimuli; pass input currents "
            "through the returned module instead"
        )
    from ._torch_bridge import NetworkTorchBridge

    if surrogate_fn is None:
        from sc_neurocore.training.surrogate import atan_surrogate_custom_op

        surrogate_fn = atan_surrogate_custom_op

    return NetworkTorchBridge(
        self.populations,
        self.projections,
        surrogate_fn=surrogate_fn,
    )

Population

A group of N identical neurons with vectorized state access.

Source code in src/sc_neurocore/network/population.py
Python
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class Population:
    """A group of N identical neurons with vectorized state access."""

    def __init__(
        self,
        model: type[Any] | str,
        n: int,
        params: dict[str, Any] | None = None,
        label: str | None = None,
    ) -> None:
        """Create *n* neurons of *model* (class or string name)."""
        cls = _resolve_model(model)
        kw = params or {}
        self.neurons = [cls(**kw) for _ in range(n)]
        self.n = n
        self.model_name = cls.__name__
        self.label = label or cls.__name__
        self._model_cls = cls
        self._voltages = np.zeros(n, dtype=np.float64)
        self._sync_voltages()

    def _sync_voltages(self) -> None:
        """Pull membrane voltage from each neuron into the flat array."""
        for i, neuron in enumerate(self.neurons):
            self._voltages[i] = getattr(neuron, "v", 0.0)

    def step_all(self, currents: np.ndarray, spike_gating: bool = False) -> np.ndarray:
        """Advance all neurons one timestep; return binary spike vector.

        If *spike_gating* is True, neurons with zero input current and
        voltage near rest (within 1% of threshold) are skipped. This
        makes compute roughly proportional to active neurons — useful for
        sparse-firing networks. Skipped neurons still decay via leak if
        their model tracks sub-threshold dynamics.
        """
        spikes = np.zeros(self.n, dtype=np.int8)
        if spike_gating:
            for i, neuron in enumerate(self.neurons):
                v = getattr(neuron, "v", 0.0)
                v_thresh = getattr(neuron, "v_threshold", 1.0)
                v_rest = getattr(neuron, "v_rest", 0.0)
                # Skip if no input AND voltage within 1% of rest
                if currents[i] == 0.0 and abs(v - v_rest) < 0.01 * abs(v_thresh - v_rest):
                    continue
                raw = neuron.step(float(currents[i]))
                spikes[i] = min(max(int(raw), 0), 1)
                self._voltages[i] = getattr(neuron, "v", 0.0)
        else:
            for i, neuron in enumerate(self.neurons):
                raw = neuron.step(float(currents[i]))
                spikes[i] = min(max(int(raw), 0), 1)
                self._voltages[i] = getattr(neuron, "v", 0.0)
        return spikes

    def reset_all(self) -> None:
        """Reset every neuron to its initial state."""
        for neuron in self.neurons:
            if hasattr(neuron, "reset"):
                neuron.reset()
            elif hasattr(neuron, "reset_state"):
                neuron.reset_state()
        self._sync_voltages()

    def get_states(self) -> dict[str, np.ndarray]:
        """Collect all neuron states into arrays keyed by variable name."""
        if self.n == 0:
            return {}
        sample = self.neurons[0]
        if hasattr(sample, "get_state"):
            keys = sample.get_state().keys()
        elif hasattr(sample, "__dataclass_fields__"):
            keys = [k for k in sample.__dataclass_fields__ if k not in ("dt",)]
        else:
            keys = ["v"]
        result = {}
        for k in keys:
            result[k] = np.array([getattr(n, k, 0.0) for n in self.neurons])
        return result

    def set_voltages(self, voltages: np.ndarray) -> None:
        """Sync voltages from an external source (e.g. Rust backend) into neurons."""
        for i, neuron in enumerate(self.neurons):
            if hasattr(neuron, "v"):
                neuron.v = float(voltages[i])
        self._voltages[:] = voltages[: self.n]

    @property
    def voltages(self) -> np.ndarray:
        """Current membrane voltages (read-only view)."""
        return self._voltages

voltages property

Current membrane voltages (read-only view).

__init__(model, n, params=None, label=None)

Create n neurons of model (class or string name).

Source code in src/sc_neurocore/network/population.py
Python
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    model: type[Any] | str,
    n: int,
    params: dict[str, Any] | None = None,
    label: str | None = None,
) -> None:
    """Create *n* neurons of *model* (class or string name)."""
    cls = _resolve_model(model)
    kw = params or {}
    self.neurons = [cls(**kw) for _ in range(n)]
    self.n = n
    self.model_name = cls.__name__
    self.label = label or cls.__name__
    self._model_cls = cls
    self._voltages = np.zeros(n, dtype=np.float64)
    self._sync_voltages()

step_all(currents, spike_gating=False)

Advance all neurons one timestep; return binary spike vector.

If spike_gating is True, neurons with zero input current and voltage near rest (within 1% of threshold) are skipped. This makes compute roughly proportional to active neurons — useful for sparse-firing networks. Skipped neurons still decay via leak if their model tracks sub-threshold dynamics.

Source code in src/sc_neurocore/network/population.py
Python
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def step_all(self, currents: np.ndarray, spike_gating: bool = False) -> np.ndarray:
    """Advance all neurons one timestep; return binary spike vector.

    If *spike_gating* is True, neurons with zero input current and
    voltage near rest (within 1% of threshold) are skipped. This
    makes compute roughly proportional to active neurons — useful for
    sparse-firing networks. Skipped neurons still decay via leak if
    their model tracks sub-threshold dynamics.
    """
    spikes = np.zeros(self.n, dtype=np.int8)
    if spike_gating:
        for i, neuron in enumerate(self.neurons):
            v = getattr(neuron, "v", 0.0)
            v_thresh = getattr(neuron, "v_threshold", 1.0)
            v_rest = getattr(neuron, "v_rest", 0.0)
            # Skip if no input AND voltage within 1% of rest
            if currents[i] == 0.0 and abs(v - v_rest) < 0.01 * abs(v_thresh - v_rest):
                continue
            raw = neuron.step(float(currents[i]))
            spikes[i] = min(max(int(raw), 0), 1)
            self._voltages[i] = getattr(neuron, "v", 0.0)
    else:
        for i, neuron in enumerate(self.neurons):
            raw = neuron.step(float(currents[i]))
            spikes[i] = min(max(int(raw), 0), 1)
            self._voltages[i] = getattr(neuron, "v", 0.0)
    return spikes

reset_all()

Reset every neuron to its initial state.

Source code in src/sc_neurocore/network/population.py
Python
84
85
86
87
88
89
90
91
def reset_all(self) -> None:
    """Reset every neuron to its initial state."""
    for neuron in self.neurons:
        if hasattr(neuron, "reset"):
            neuron.reset()
        elif hasattr(neuron, "reset_state"):
            neuron.reset_state()
    self._sync_voltages()

get_states()

Collect all neuron states into arrays keyed by variable name.

Source code in src/sc_neurocore/network/population.py
Python
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def get_states(self) -> dict[str, np.ndarray]:
    """Collect all neuron states into arrays keyed by variable name."""
    if self.n == 0:
        return {}
    sample = self.neurons[0]
    if hasattr(sample, "get_state"):
        keys = sample.get_state().keys()
    elif hasattr(sample, "__dataclass_fields__"):
        keys = [k for k in sample.__dataclass_fields__ if k not in ("dt",)]
    else:
        keys = ["v"]
    result = {}
    for k in keys:
        result[k] = np.array([getattr(n, k, 0.0) for n in self.neurons])
    return result

set_voltages(voltages)

Sync voltages from an external source (e.g. Rust backend) into neurons.

Source code in src/sc_neurocore/network/population.py
Python
109
110
111
112
113
114
def set_voltages(self, voltages: np.ndarray) -> None:
    """Sync voltages from an external source (e.g. Rust backend) into neurons."""
    for i, neuron in enumerate(self.neurons):
        if hasattr(neuron, "v"):
            neuron.v = float(voltages[i])
    self._voltages[:] = voltages[: self.n]

Projection

Synaptic projection from source to target population.

Parameters

delay : float, array-like, or 0 - 0: no delay (default) - scalar > 0: uniform axonal delay (all synapses share one delay) - 1-D array of length n_synapses: per-synapse delay in timesteps. Enables heterogeneous axonal/synaptic delays.

Source code in src/sc_neurocore/network/projection.py
Python
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
class Projection:
    """Synaptic projection from source to target population.

    Parameters
    ----------
    delay : float, array-like, or 0
        - 0: no delay (default)
        - scalar > 0: uniform axonal delay (all synapses share one delay)
        - 1-D array of length n_synapses: per-synapse delay in timesteps.
          Enables heterogeneous axonal/synaptic delays.
    """

    TOPOLOGY_MAP = {
        "random": _topo.random_connectivity,
        "all_to_all": _topo.all_to_all,
        "ring": _topo.ring_topology,
        "small_world": _topo.small_world,
        "scale_free": _topo.scale_free,
    }

    def __init__(
        self,
        source: Population,
        target: Population,
        weight: float,
        probability: float = 1.0,
        delay: float | np.ndarray = 0.0,
        topology: str | tuple[np.ndarray, np.ndarray, np.ndarray] = "random",
        plasticity: str | None = None,
        seed: int = 42,
        weight_threshold: float = 0.0,
    ) -> None:
        """Create projection with CSR connectivity and optional delay/plasticity.

        *weight_threshold*: skip synapses with |w| <= threshold during
        propagation. Set > 0 to exploit weight sparsity after pruning.
        """
        self.source = source
        self.target = target
        self.weight = weight
        self.plasticity = plasticity
        self.seed = seed
        self.weight_threshold = weight_threshold
        self.delay: float | np.ndarray = 0.0
        self._delay_mode: str = "none"
        self._delay_buf: np.ndarray | None = None
        self._per_syn_delays: np.ndarray | None = None

        self.indptr, self.indices, self.data = self._build_connectivity(topology, probability, seed)

        self._init_delays(delay)

        if plasticity == "stdp":
            self._pre_trace = np.zeros(source.n, dtype=np.float64)
            self._post_trace = np.zeros(target.n, dtype=np.float64)

    def _init_delays(self, delay: float | np.ndarray) -> None:
        """Set up delay buffers based on delay specification."""
        delay = np.atleast_1d(np.asarray(delay, dtype=np.float64)).flatten()
        n_synapses = len(self.data)

        if delay.size == 1 and delay[0] == 0.0:
            # No delay
            self._delay_mode = "none"
            self.delay = 0.0
            self._delay_buf = None
            self._per_syn_delays = None
            return

        if delay.size == 1:
            # Uniform axonal delay
            self._delay_mode = "uniform"
            self.delay = float(delay[0])
            steps = max(1, int(round(self.delay)))
            self._delay_buf = np.zeros((steps, self.target.n), dtype=np.float64)
            self._delay_idx = 0
            self._delay_steps_uniform = steps
            self._per_syn_delays = None
            return

        # Per-synapse delays
        if delay.size != n_synapses:
            raise ValueError(
                f"Per-synapse delay array length ({delay.size}) must match "
                f"number of connections ({n_synapses})"
            )
        self._delay_mode = "per_synapse"
        self.delay = delay
        self._per_syn_delays = np.round(delay).astype(np.int64)
        self._per_syn_delays = np.clip(self._per_syn_delays, 0, None)
        max_d = int(self._per_syn_delays.max()) + 1
        # Spike history ring buffer: (max_delay+1, n_source)
        self._spike_history = np.zeros((max_d, self.source.n), dtype=np.float64)
        self._hist_idx = 0
        self._delay_buf = None

    @property
    def n_synapses(self) -> int:
        """Number of synaptic connections."""
        return len(self.data)

    @property
    def delay_mode(self) -> str:
        """Delay mode: 'none', 'uniform', or 'per_synapse'."""
        return self._delay_mode

    @property
    def max_delay(self) -> int:
        """Maximum delay in timesteps across all synapses."""
        if self._delay_mode == "none":
            return 0
        if self._delay_mode == "uniform":
            return self._delay_steps_uniform
        if self._per_syn_delays is None:
            raise RuntimeError("Projection per-synapse delay state is not initialized")
        return int(self._per_syn_delays.max())

    def _build_connectivity(
        self,
        topology: str | tuple[np.ndarray, np.ndarray, np.ndarray],
        probability: float,
        seed: int,
    ) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Build CSR arrays from topology name or pre-built tuple."""
        if isinstance(topology, tuple) and len(topology) == 3:
            return validate_csr_topology(
                topology[0],
                topology[1],
                topology[2],
                self.source.n,
                self.target.n,
            )
        if topology == "random":
            return _topo.random_connectivity(
                self.source.n, self.target.n, probability, self.weight, seed
            )
        if topology == "all_to_all":
            return _topo.all_to_all(self.source.n, self.target.n, self.weight)
        if topology in ("ring", "small_world", "scale_free"):
            raise ValueError(
                f"Topology '{topology}' requires same-size source/target; "
                "pass pre-built CSR tuple instead."
            )
        raise ValueError(f"Unknown topology '{topology}'")

    def propagate(self, source_spikes: np.ndarray) -> np.ndarray:
        """Compute target currents from source spikes through CSR connectivity.

        Handles three delay modes:
        - none: direct CSR matvec
        - uniform: aggregated current through circular buffer
        - per_synapse: each synapse reads from spike history at its own delay
        """
        wt = self.weight_threshold
        if self._delay_mode == "none":
            return _csr_matvec(
                self.indptr, self.indices, self.data, source_spikes, self.target.n, wt
            )

        if self._delay_mode == "uniform":
            delay_buf = self._delay_buf
            if delay_buf is None:
                raise RuntimeError("Projection uniform delay buffer is not initialized")
            current = _csr_matvec(
                self.indptr, self.indices, self.data, source_spikes, self.target.n, wt
            )
            output = cast(np.ndarray, delay_buf[self._delay_idx].copy())
            delay_buf[self._delay_idx] = current
            self._delay_idx = (self._delay_idx + 1) % self._delay_steps_uniform
            return output

        # Per-synapse delay
        if self._per_syn_delays is None:
            raise RuntimeError("Projection per-synapse delay state is not initialized")
        self._spike_history[self._hist_idx] = source_spikes.astype(np.float64)
        current = _csr_delayed_matvec(
            self.indptr,
            self.indices,
            self.data,
            self._per_syn_delays,
            self._spike_history,
            self._hist_idx,
            self.target.n,
        )
        self._hist_idx = (self._hist_idx + 1) % self._spike_history.shape[0]
        return current

    def update_plasticity(
        self,
        src_spikes: np.ndarray,
        tgt_spikes: np.ndarray,
        a_plus: float = 0.01,
        a_minus: float = 0.012,
        tau: float = 20.0,
        directional_bias: float = 1.0,
    ) -> None:
        """Trace-based STDP weight update.

        *directional_bias* scales a_plus for this projection. Set to 1.36
        for bottom-up (sensory → higher) projections per quantum-control
        NB19 (measured autonomic → cortical asymmetry = 0.36).
        Default 1.0 = symmetric learning.
        """
        if self.plasticity != "stdp":
            return
        decay = np.exp(-1.0 / tau)
        self._pre_trace = self._pre_trace * decay + src_spikes.astype(np.float64)
        self._post_trace = self._post_trace * decay + tgt_spikes.astype(np.float64)

        n_src = self.source.n
        for i in range(n_src):
            for k in range(self.indptr[i], self.indptr[i + 1]):
                j = self.indices[k]
                if src_spikes[i]:
                    self.data[k] -= a_minus * self._post_trace[j]
                if tgt_spikes[j]:
                    self.data[k] += a_plus * directional_bias * self._pre_trace[i]
                self.data[k] = max(0.0, self.data[k])

        # Enforce K symmetry for self-projections (same source and target).
        # Gradient/STDP updates break W = W^T after ~30 steps (SPO Finding #7).
        # Asymmetric coupling hurts sync by +12% (quantum-control NB24).
        if self.source is self.target:
            self._enforce_symmetry()

    def _enforce_symmetry(self) -> None:
        """Symmetrise CSR weight data: W_ij = W_ji = (W_ij + W_ji) / 2.

        Only meaningful for self-projections (source is target, square matrix).
        Iterates over non-zero entries and averages with their transpose.
        """
        n = self.source.n
        for i in range(n):
            for k in range(self.indptr[i], self.indptr[i + 1]):
                j = self.indices[k]
                if j <= i:
                    continue
                # Find reverse edge j→i
                for k2 in range(self.indptr[j], self.indptr[j + 1]):
                    if self.indices[k2] == i:
                        avg = (self.data[k] + self.data[k2]) / 2.0
                        self.data[k] = avg
                        self.data[k2] = avg
                        break

n_synapses property

Number of synaptic connections.

delay_mode property

Delay mode: 'none', 'uniform', or 'per_synapse'.

max_delay property

Maximum delay in timesteps across all synapses.

__init__(source, target, weight, probability=1.0, delay=0.0, topology='random', plasticity=None, seed=42, weight_threshold=0.0)

Create projection with CSR connectivity and optional delay/plasticity.

weight_threshold: skip synapses with |w| <= threshold during propagation. Set > 0 to exploit weight sparsity after pruning.

Source code in src/sc_neurocore/network/projection.py
Python
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def __init__(
    self,
    source: Population,
    target: Population,
    weight: float,
    probability: float = 1.0,
    delay: float | np.ndarray = 0.0,
    topology: str | tuple[np.ndarray, np.ndarray, np.ndarray] = "random",
    plasticity: str | None = None,
    seed: int = 42,
    weight_threshold: float = 0.0,
) -> None:
    """Create projection with CSR connectivity and optional delay/plasticity.

    *weight_threshold*: skip synapses with |w| <= threshold during
    propagation. Set > 0 to exploit weight sparsity after pruning.
    """
    self.source = source
    self.target = target
    self.weight = weight
    self.plasticity = plasticity
    self.seed = seed
    self.weight_threshold = weight_threshold
    self.delay: float | np.ndarray = 0.0
    self._delay_mode: str = "none"
    self._delay_buf: np.ndarray | None = None
    self._per_syn_delays: np.ndarray | None = None

    self.indptr, self.indices, self.data = self._build_connectivity(topology, probability, seed)

    self._init_delays(delay)

    if plasticity == "stdp":
        self._pre_trace = np.zeros(source.n, dtype=np.float64)
        self._post_trace = np.zeros(target.n, dtype=np.float64)

propagate(source_spikes)

Compute target currents from source spikes through CSR connectivity.

Handles three delay modes: - none: direct CSR matvec - uniform: aggregated current through circular buffer - per_synapse: each synapse reads from spike history at its own delay

Source code in src/sc_neurocore/network/projection.py
Python
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def propagate(self, source_spikes: np.ndarray) -> np.ndarray:
    """Compute target currents from source spikes through CSR connectivity.

    Handles three delay modes:
    - none: direct CSR matvec
    - uniform: aggregated current through circular buffer
    - per_synapse: each synapse reads from spike history at its own delay
    """
    wt = self.weight_threshold
    if self._delay_mode == "none":
        return _csr_matvec(
            self.indptr, self.indices, self.data, source_spikes, self.target.n, wt
        )

    if self._delay_mode == "uniform":
        delay_buf = self._delay_buf
        if delay_buf is None:
            raise RuntimeError("Projection uniform delay buffer is not initialized")
        current = _csr_matvec(
            self.indptr, self.indices, self.data, source_spikes, self.target.n, wt
        )
        output = cast(np.ndarray, delay_buf[self._delay_idx].copy())
        delay_buf[self._delay_idx] = current
        self._delay_idx = (self._delay_idx + 1) % self._delay_steps_uniform
        return output

    # Per-synapse delay
    if self._per_syn_delays is None:
        raise RuntimeError("Projection per-synapse delay state is not initialized")
    self._spike_history[self._hist_idx] = source_spikes.astype(np.float64)
    current = _csr_delayed_matvec(
        self.indptr,
        self.indices,
        self.data,
        self._per_syn_delays,
        self._spike_history,
        self._hist_idx,
        self.target.n,
    )
    self._hist_idx = (self._hist_idx + 1) % self._spike_history.shape[0]
    return current

update_plasticity(src_spikes, tgt_spikes, a_plus=0.01, a_minus=0.012, tau=20.0, directional_bias=1.0)

Trace-based STDP weight update.

directional_bias scales a_plus for this projection. Set to 1.36 for bottom-up (sensory → higher) projections per quantum-control NB19 (measured autonomic → cortical asymmetry = 0.36). Default 1.0 = symmetric learning.

Source code in src/sc_neurocore/network/projection.py
Python
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def update_plasticity(
    self,
    src_spikes: np.ndarray,
    tgt_spikes: np.ndarray,
    a_plus: float = 0.01,
    a_minus: float = 0.012,
    tau: float = 20.0,
    directional_bias: float = 1.0,
) -> None:
    """Trace-based STDP weight update.

    *directional_bias* scales a_plus for this projection. Set to 1.36
    for bottom-up (sensory → higher) projections per quantum-control
    NB19 (measured autonomic → cortical asymmetry = 0.36).
    Default 1.0 = symmetric learning.
    """
    if self.plasticity != "stdp":
        return
    decay = np.exp(-1.0 / tau)
    self._pre_trace = self._pre_trace * decay + src_spikes.astype(np.float64)
    self._post_trace = self._post_trace * decay + tgt_spikes.astype(np.float64)

    n_src = self.source.n
    for i in range(n_src):
        for k in range(self.indptr[i], self.indptr[i + 1]):
            j = self.indices[k]
            if src_spikes[i]:
                self.data[k] -= a_minus * self._post_trace[j]
            if tgt_spikes[j]:
                self.data[k] += a_plus * directional_bias * self._pre_trace[i]
            self.data[k] = max(0.0, self.data[k])

    # Enforce K symmetry for self-projections (same source and target).
    # Gradient/STDP updates break W = W^T after ~30 steps (SPO Finding #7).
    # Asymmetric coupling hurts sync by +12% (quantum-control NB24).
    if self.source is self.target:
        self._enforce_symmetry()

SpikeMonitor

Records (neuron_idx, timestep) pairs from a population.

Source code in src/sc_neurocore/network/monitor.py
Python
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class SpikeMonitor:
    """Records (neuron_idx, timestep) pairs from a population."""

    def __init__(self, population: Population, label: str | None = None) -> None:
        self.population = population
        self.label = label or f"spikes_{population.label}"
        self._neuron_ids: list[int] = []
        self._timesteps: list[int] = []

    def record(self, spikes: np.ndarray, t_step: int) -> None:
        """Store spike events for this timestep (from binary spike vector)."""
        idx = np.nonzero(spikes)[0]
        for i in idx:
            self._neuron_ids.append(int(i))
            self._timesteps.append(t_step)

    def record_event(self, neuron_id: int, t_step: int) -> None:
        """Store a single spike event directly (from Rust backend)."""
        self._neuron_ids.append(neuron_id)
        self._timesteps.append(t_step)

    @property
    def spike_times(self) -> np.ndarray:
        """All spike timesteps as 1-D array."""
        return np.array(self._timesteps, dtype=np.int64)

    @property
    def spike_trains(self) -> dict[int, np.ndarray]:
        """Per-neuron spike timestep arrays."""
        trains: dict[int, list[int]] = {}
        for nid, ts in zip(self._neuron_ids, self._timesteps):
            trains.setdefault(nid, []).append(ts)
        return {k: np.array(v, dtype=np.int64) for k, v in trains.items()}

    @property
    def count(self) -> int:
        """Total number of spikes recorded."""
        return len(self._neuron_ids)

    def raster_data(self) -> tuple[np.ndarray, np.ndarray]:
        """Return (timesteps, neuron_ids) arrays for raster plots."""
        return (
            np.array(self._timesteps, dtype=np.int64),
            np.array(self._neuron_ids, dtype=np.int64),
        )

    def firing_rates(self, n_steps: int, dt: float = 0.001) -> np.ndarray:
        """Mean firing rate (Hz) per neuron over the simulation."""
        duration = n_steps * dt
        rates = np.zeros(self.population.n, dtype=np.float64)
        if duration <= 0:
            return rates
        for nid in self._neuron_ids:
            rates[nid] += 1.0
        rates /= duration
        return rates

    def isi(self, neuron: int) -> np.ndarray:
        """Inter-spike intervals (timestep units) for a single neuron."""
        trains = self.spike_trains
        ts = trains.get(neuron, np.array([], dtype=np.int64))
        if ts.size < 2:
            return np.array([], dtype=np.int64)
        return np.diff(ts)

    def cross_correlation(self, i: int, j: int, max_lag: int = 50) -> tuple[np.ndarray, np.ndarray]:
        """Cross-correlogram between neurons i and j."""
        from sc_neurocore.analysis.spike_stats import cross_correlation as _cc

        trains = self.spike_trains
        ts_i = trains.get(i, np.array([], dtype=np.int64))
        ts_j = trains.get(j, np.array([], dtype=np.int64))
        if ts_i.size == 0 or ts_j.size == 0:
            lags = np.arange(-max_lag, max_lag + 1)
            return np.zeros(len(lags)), lags
        max_t = max(ts_i.max(), ts_j.max()) + 1
        bin_i = np.zeros(max_t, dtype=np.int8)
        bin_j = np.zeros(max_t, dtype=np.int8)
        bin_i[ts_i] = 1
        bin_j[ts_j] = 1
        return _cc(bin_i, bin_j, max_lag_ms=max_lag, dt=1.0)

spike_times property

All spike timesteps as 1-D array.

spike_trains property

Per-neuron spike timestep arrays.

count property

Total number of spikes recorded.

record(spikes, t_step)

Store spike events for this timestep (from binary spike vector).

Source code in src/sc_neurocore/network/monitor.py
Python
30
31
32
33
34
35
def record(self, spikes: np.ndarray, t_step: int) -> None:
    """Store spike events for this timestep (from binary spike vector)."""
    idx = np.nonzero(spikes)[0]
    for i in idx:
        self._neuron_ids.append(int(i))
        self._timesteps.append(t_step)

record_event(neuron_id, t_step)

Store a single spike event directly (from Rust backend).

Source code in src/sc_neurocore/network/monitor.py
Python
37
38
39
40
def record_event(self, neuron_id: int, t_step: int) -> None:
    """Store a single spike event directly (from Rust backend)."""
    self._neuron_ids.append(neuron_id)
    self._timesteps.append(t_step)

raster_data()

Return (timesteps, neuron_ids) arrays for raster plots.

Source code in src/sc_neurocore/network/monitor.py
Python
60
61
62
63
64
65
def raster_data(self) -> tuple[np.ndarray, np.ndarray]:
    """Return (timesteps, neuron_ids) arrays for raster plots."""
    return (
        np.array(self._timesteps, dtype=np.int64),
        np.array(self._neuron_ids, dtype=np.int64),
    )

firing_rates(n_steps, dt=0.001)

Mean firing rate (Hz) per neuron over the simulation.

Source code in src/sc_neurocore/network/monitor.py
Python
67
68
69
70
71
72
73
74
75
76
def firing_rates(self, n_steps: int, dt: float = 0.001) -> np.ndarray:
    """Mean firing rate (Hz) per neuron over the simulation."""
    duration = n_steps * dt
    rates = np.zeros(self.population.n, dtype=np.float64)
    if duration <= 0:
        return rates
    for nid in self._neuron_ids:
        rates[nid] += 1.0
    rates /= duration
    return rates

isi(neuron)

Inter-spike intervals (timestep units) for a single neuron.

Source code in src/sc_neurocore/network/monitor.py
Python
78
79
80
81
82
83
84
def isi(self, neuron: int) -> np.ndarray:
    """Inter-spike intervals (timestep units) for a single neuron."""
    trains = self.spike_trains
    ts = trains.get(neuron, np.array([], dtype=np.int64))
    if ts.size < 2:
        return np.array([], dtype=np.int64)
    return np.diff(ts)

cross_correlation(i, j, max_lag=50)

Cross-correlogram between neurons i and j.

Source code in src/sc_neurocore/network/monitor.py
Python
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def cross_correlation(self, i: int, j: int, max_lag: int = 50) -> tuple[np.ndarray, np.ndarray]:
    """Cross-correlogram between neurons i and j."""
    from sc_neurocore.analysis.spike_stats import cross_correlation as _cc

    trains = self.spike_trains
    ts_i = trains.get(i, np.array([], dtype=np.int64))
    ts_j = trains.get(j, np.array([], dtype=np.int64))
    if ts_i.size == 0 or ts_j.size == 0:
        lags = np.arange(-max_lag, max_lag + 1)
        return np.zeros(len(lags)), lags
    max_t = max(ts_i.max(), ts_j.max()) + 1
    bin_i = np.zeros(max_t, dtype=np.int8)
    bin_j = np.zeros(max_t, dtype=np.int8)
    bin_i[ts_i] = 1
    bin_j[ts_j] = 1
    return _cc(bin_i, bin_j, max_lag_ms=max_lag, dt=1.0)

StateMonitor

Records state variable traces from a population.

Source code in src/sc_neurocore/network/monitor.py
Python
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class StateMonitor:
    """Records state variable traces from a population."""

    def __init__(
        self,
        population: Population,
        variables: list[str] | None = None,
        record: list[int] | None = None,
    ) -> None:
        self.population = population
        self.variables = variables or ["v"]
        self.record = record
        self._data: dict[str, list[np.ndarray]] = {v: [] for v in self.variables}
        self._t: list[int] = []

    def snapshot(self, t_step: int) -> None:
        """Capture current state variables."""
        self._t.append(t_step)
        states = self.population.get_states()
        for v in self.variables:
            arr = states.get(v, np.zeros(self.population.n))
            if self.record is not None:
                arr = arr[np.array(self.record)]
            self._data[v].append(arr.copy())

    @property
    def traces(self) -> dict[str, np.ndarray]:
        """Variable traces as {name: (n_steps, n_neurons)} arrays."""
        return {k: np.array(v) if v else np.empty((0, 0)) for k, v in self._data.items()}

    @property
    def t(self) -> np.ndarray:
        """Timestep array."""
        return np.array(self._t, dtype=np.int64)

traces property

Variable traces as {name: (n_steps, n_neurons)} arrays.

t property

Timestep array.

snapshot(t_step)

Capture current state variables.

Source code in src/sc_neurocore/network/monitor.py
Python
119
120
121
122
123
124
125
126
127
def snapshot(self, t_step: int) -> None:
    """Capture current state variables."""
    self._t.append(t_step)
    states = self.population.get_states()
    for v in self.variables:
        arr = states.get(v, np.zeros(self.population.n))
        if self.record is not None:
            arr = arr[np.array(self.record)]
        self._data[v].append(arr.copy())

RateMonitor

Population firing rate in time bins.

Source code in src/sc_neurocore/network/monitor.py
Python
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class RateMonitor:
    """Population firing rate in time bins."""

    def __init__(self, population: Population, bin_ms: int = 10) -> None:
        self.population = population
        self.bin_ms = bin_ms
        self._spike_counts: list[int] = []
        self._bin_edges: list[int] = []
        self._current_count = 0
        self._steps_in_bin = 0

    def record(self, spikes: np.ndarray, t_step: int, dt: float = 0.001) -> None:
        """Accumulate spikes; flush when a bin completes."""
        self._current_count += int(spikes.sum())
        self._steps_in_bin += 1
        steps_per_bin = max(1, int(self.bin_ms / 1000.0 / dt))
        if self._steps_in_bin >= steps_per_bin:
            self._spike_counts.append(self._current_count)
            self._bin_edges.append(t_step)
            self._current_count = 0
            self._steps_in_bin = 0

    @property
    def rate(self) -> np.ndarray:
        """Firing rate (Hz) per bin."""
        if not self._spike_counts:
            return np.array([], dtype=np.float64)
        duration_s = self.bin_ms / 1000.0
        counts = np.array(self._spike_counts, dtype=np.float64)
        return counts / (duration_s * self.population.n)

    @property
    def t(self) -> np.ndarray:
        """Bin edge timestep array."""
        return np.array(self._bin_edges, dtype=np.int64)

rate property

Firing rate (Hz) per bin.

t property

Bin edge timestep array.

record(spikes, t_step, dt=0.001)

Accumulate spikes; flush when a bin completes.

Source code in src/sc_neurocore/network/monitor.py
Python
151
152
153
154
155
156
157
158
159
160
def record(self, spikes: np.ndarray, t_step: int, dt: float = 0.001) -> None:
    """Accumulate spikes; flush when a bin completes."""
    self._current_count += int(spikes.sum())
    self._steps_in_bin += 1
    steps_per_bin = max(1, int(self.bin_ms / 1000.0 / dt))
    if self._steps_in_bin >= steps_per_bin:
        self._spike_counts.append(self._current_count)
        self._bin_edges.append(t_step)
        self._current_count = 0
        self._steps_in_bin = 0

TimedArray

Time-varying current from a pre-computed array.

Source code in src/sc_neurocore/network/stimulus.py
Python
21
22
23
24
25
26
27
28
29
30
31
32
class TimedArray:
    """Time-varying current from a pre-computed array."""

    def __init__(self, values: np.ndarray | list[float], dt: float = 0.001) -> None:
        self.values = np.asarray(values, dtype=np.float64)
        self.dt = dt
        self.target: Population | None = None

    def get_current(self, t_step: int) -> float:
        """Return the value at timestep t_step (clamps to last value)."""
        idx = min(t_step, len(self.values) - 1)
        return float(self.values[idx])

get_current(t_step)

Return the value at timestep t_step (clamps to last value).

Source code in src/sc_neurocore/network/stimulus.py
Python
29
30
31
32
def get_current(self, t_step: int) -> float:
    """Return the value at timestep t_step (clamps to last value)."""
    idx = min(t_step, len(self.values) - 1)
    return float(self.values[idx])

PoissonInput

Random Poisson spike input producing weighted current.

Source code in src/sc_neurocore/network/stimulus.py
Python
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class PoissonInput:
    """Random Poisson spike input producing weighted current."""

    def __init__(
        self, n: int, rate_hz: float, weight: float, dt: float = 0.001, seed: int = 42
    ) -> None:
        self.n = n
        self.rate_hz = rate_hz
        self.weight = weight
        self.dt = dt
        self._rng = np.random.default_rng(seed)
        self.target: Population | None = None

    def get_current(self, t_step: int, dt: float | None = None) -> np.ndarray:
        """Generate Poisson spikes and return weighted current vector."""
        step_dt = dt if dt is not None else self.dt
        p_spike = self.rate_hz * step_dt
        spikes = (self._rng.random(self.n) < p_spike).astype(np.float64)
        return spikes * self.weight

get_current(t_step, dt=None)

Generate Poisson spikes and return weighted current vector.

Source code in src/sc_neurocore/network/stimulus.py
Python
48
49
50
51
52
53
def get_current(self, t_step: int, dt: float | None = None) -> np.ndarray:
    """Generate Poisson spikes and return weighted current vector."""
    step_dt = dt if dt is not None else self.dt
    p_spike = self.rate_hz * step_dt
    spikes = (self._rng.random(self.n) < p_spike).astype(np.float64)
    return spikes * self.weight

StepCurrent

Rectangular step current between onset and offset timesteps.

Source code in src/sc_neurocore/network/stimulus.py
Python
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class StepCurrent:
    """Rectangular step current between onset and offset timesteps."""

    def __init__(self, onset: int, offset: int, amplitude: float) -> None:
        self.onset = onset
        self.offset = offset
        self.amplitude = amplitude
        self.target: Population | None = None

    def get_current(self, t_step: int, dt: float = 0.001) -> float:
        """Return amplitude if within [onset, offset), else 0."""
        if self.onset <= t_step < self.offset:
            return self.amplitude
        return 0.0

get_current(t_step, dt=0.001)

Return amplitude if within [onset, offset), else 0.

Source code in src/sc_neurocore/network/stimulus.py
Python
65
66
67
68
69
def get_current(self, t_step: int, dt: float = 0.001) -> float:
    """Return amplitude if within [onset, offset), else 0."""
    if self.onset <= t_step < self.offset:
        return self.amplitude
    return 0.0

MPIRunner

MPI-distributed network simulation.

Partitions populations across MPI ranks via round-robin assignment. Each rank steps only its local populations; spikes propagate via MPI_Allgatherv every timestep.

Each rank steps supported local populations through the Rust engine's step_population API when the extension is importable and every local model on the rank is supported. Otherwise the runner falls back to Population.step_all for CPU-only environments. spike_gating and fim_lambda are unsupported by this runner — the Network._run_mpi dispatcher raises NotImplementedError when either is requested with backend='mpi'.

Source code in src/sc_neurocore/network/mpi_runner.py
Python
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class MPIRunner:
    """MPI-distributed network simulation.

    Partitions populations across MPI ranks via round-robin assignment.
    Each rank steps only its local populations; spikes propagate via
    ``MPI_Allgatherv`` every timestep.

    Each rank steps supported local populations through the Rust engine's
    ``step_population`` API when the extension is importable and every
    local model on the rank is supported. Otherwise the runner falls back
    to ``Population.step_all`` for CPU-only environments. ``spike_gating``
    and ``fim_lambda`` are unsupported by this runner — the
    ``Network._run_mpi`` dispatcher raises ``NotImplementedError`` when
    either is requested with ``backend='mpi'``.
    """

    def __init__(self, network: Network) -> None:
        _require_mpi()
        assert MPI is not None
        self.comm = MPI.COMM_WORLD
        self.rank: int = self.comm.Get_rank()
        self.size: int = self.comm.Get_size()
        self.network = network

        self._populations: list[Population] = network.populations
        self._projections: list[Projection] = network.projections
        self._local_indices: list[int] = []
        self._rank_of: dict[int, int] = {}
        self._partition_populations()

        self._cross_rank_projs: list[tuple[int, Projection]] = []
        self._local_projs: list[Projection] = []
        self._identify_cross_rank_projections()

        self._rust_runner: Any | None = None
        self._rust_local_pop_indices: dict[int, int] = {}
        self._rust_dispatch_enabled = False
        self._initialize_rust_dispatch()

    def _initialize_rust_dispatch(self) -> None:
        """Prepare a rank-local Rust runner when the installed engine supports it."""
        if not self._local_indices:
            return
        if not all(
            _rust_supports_model(self._populations[idx].model_name) for idx in self._local_indices
        ):
            return
        engine_cls = _get_rust_engine()
        if engine_cls is False:
            return
        runner = engine_cls()
        if not hasattr(runner, "step_population"):
            return
        for global_idx in self._local_indices:
            pop = self._populations[global_idx]
            rust_idx = runner.add_population(pop.model_name, pop.n)
            self._rust_local_pop_indices[global_idx] = int(rust_idx)
        self._rust_runner = runner
        self._rust_dispatch_enabled = True

    def _partition_populations(self) -> None:
        """Round-robin assignment of populations to ranks."""
        for i in range(len(self._populations)):
            owner = i % self.size
            self._rank_of[i] = owner
            if owner == self.rank:
                self._local_indices.append(i)

    def _identify_cross_rank_projections(self) -> None:
        """Separate projections into local and cross-rank."""
        pop_id_to_idx = {id(p): i for i, p in enumerate(self._populations)}
        for proj in self._projections:
            src_idx = pop_id_to_idx.get(id(proj.source), -1)
            tgt_idx = pop_id_to_idx.get(id(proj.target), -1)
            src_rank = self._rank_of.get(src_idx, -1)
            tgt_rank = self._rank_of.get(tgt_idx, -1)
            if src_rank != tgt_rank:
                self._cross_rank_projs.append((src_idx, proj))
            else:
                if tgt_rank == self.rank:
                    self._local_projs.append(proj)

    def _exchange_spikes(self, local_spikes: dict[int, np.ndarray]) -> dict[int, np.ndarray]:
        """Allgatherv spike vectors so every rank knows who spiked.

        Each rank sends spike vectors for its local populations packed
        as (pop_index, n_neurons, spike_data...). Returns a dict of
        pop_index -> spike array for all populations.
        """
        assert MPI is not None
        chunks: list[np.ndarray] = []
        for idx in self._local_indices:
            spikes = local_spikes.get(idx, np.zeros(self._populations[idx].n, dtype=np.int8))
            header = np.array([idx, spikes.shape[0]], dtype=np.int32)
            chunks.append(header.view(np.int8))
            chunks.append(spikes)

        send_buf = np.concatenate(chunks) if chunks else np.array([], dtype=np.int8)
        send_count = np.array(send_buf.shape[0], dtype=np.int32)
        recv_counts = np.empty(self.size, dtype=np.int32)
        self.comm.Allgather(send_count, recv_counts)

        total = int(recv_counts.sum())
        recv_buf = np.empty(total, dtype=np.int8)
        displacements = np.zeros(self.size, dtype=np.int32)
        for i in range(1, self.size):
            displacements[i] = displacements[i - 1] + recv_counts[i - 1]

        self.comm.Allgatherv(send_buf, [recv_buf, recv_counts, displacements, MPI.BYTE])

        all_spikes: dict[int, np.ndarray] = {}
        pos = 0
        while pos < total:
            header = recv_buf[pos : pos + 8].view(np.int32)
            pop_idx = int(header[0])
            n = int(header[1])
            pos += 8
            all_spikes[pop_idx] = recv_buf[pos : pos + n].copy()
            pos += n

        return all_spikes

    def _step_local(
        self,
        pop_to_currents: dict[int, np.ndarray],
        last_spikes: dict[int, np.ndarray],
    ) -> dict[int, np.ndarray]:
        """Step only this rank's populations, return local spike dict."""
        local_spikes: dict[int, np.ndarray] = {}
        for idx in self._local_indices:
            pop = self._populations[idx]
            currents = np.asarray(
                pop_to_currents.get(idx, np.zeros(pop.n, dtype=np.float64)),
                dtype=np.float64,
            )
            if currents.shape != (pop.n,):
                raise ValueError(
                    f"current vector for population {idx} has shape {currents.shape}, "
                    f"expected {(pop.n,)}"
                )
            if self._rust_dispatch_enabled:
                assert self._rust_runner is not None
                result = self._rust_runner.step_population(
                    self._rust_local_pop_indices[idx],
                    np.ascontiguousarray(currents, dtype=np.float64),
                )
                spikes = np.asarray(result["spikes"], dtype=np.int8)
                voltages = np.asarray(result["voltages"], dtype=np.float64)
                if spikes.shape != (pop.n,):
                    raise RuntimeError(
                        f"Rust spike vector for population {idx} has shape {spikes.shape}, "
                        f"expected {(pop.n,)}"
                    )
                if voltages.shape != (pop.n,):
                    raise RuntimeError(
                        f"Rust voltage vector for population {idx} has shape {voltages.shape}, "
                        f"expected {(pop.n,)}"
                    )
                pop.set_voltages(voltages)
                spikes = spikes.copy()
            else:
                spikes = pop.step_all(currents)
            local_spikes[idx] = spikes
        return local_spikes

    def run(self, n_steps: int, dt: float = 0.001) -> None:
        """Run the distributed simulation for *n_steps* timesteps.

        Results are recorded via the network's monitors. Global monitors
        aggregate on rank 0 only.
        """
        np.random.seed(self.network.seed + self.rank)
        pop_id_to_idx = {id(p): i for i, p in enumerate(self._populations)}
        all_spikes: dict[int, np.ndarray] = {
            i: np.zeros(p.n, dtype=np.int8) for i, p in enumerate(self._populations)
        }

        for t in range(n_steps):
            pop_to_currents: dict[int, np.ndarray] = {
                idx: np.zeros(self._populations[idx].n, dtype=np.float64)
                for idx in self._local_indices
            }

            for proj in self._local_projs:
                src_idx = pop_id_to_idx[id(proj.source)]
                tgt_idx = pop_id_to_idx[id(proj.target)]
                src_sp = all_spikes.get(src_idx, np.zeros(proj.source.n, dtype=np.int8))
                current = proj.propagate(src_sp)
                if tgt_idx in pop_to_currents:
                    pop_to_currents[tgt_idx] += current

            for src_idx, proj in self._cross_rank_projs:
                tgt_idx = pop_id_to_idx[id(proj.target)]
                src_sp = all_spikes.get(src_idx, np.zeros(proj.source.n, dtype=np.int8))
                current = proj.propagate(src_sp)
                if tgt_idx in pop_to_currents:
                    pop_to_currents[tgt_idx] += current

            local_spikes = self._step_local(pop_to_currents, all_spikes)
            all_spikes = self._exchange_spikes(local_spikes)

            if self.rank == 0:
                net = self.network
                for mon in net.spike_monitors:
                    idx = pop_id_to_idx.get(id(mon.population))
                    if idx is not None and idx in all_spikes:
                        mon.record(all_spikes[idx], t)
                for mon in net.rate_monitors:  # type: ignore[assignment]
                    idx = pop_id_to_idx.get(id(mon.population))
                    if idx is not None and idx in all_spikes:
                        mon.record(all_spikes[idx], t, dt)  # type: ignore[call-arg]

run(n_steps, dt=0.001)

Run the distributed simulation for n_steps timesteps.

Results are recorded via the network's monitors. Global monitors aggregate on rank 0 only.

Source code in src/sc_neurocore/network/mpi_runner.py
Python
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def run(self, n_steps: int, dt: float = 0.001) -> None:
    """Run the distributed simulation for *n_steps* timesteps.

    Results are recorded via the network's monitors. Global monitors
    aggregate on rank 0 only.
    """
    np.random.seed(self.network.seed + self.rank)
    pop_id_to_idx = {id(p): i for i, p in enumerate(self._populations)}
    all_spikes: dict[int, np.ndarray] = {
        i: np.zeros(p.n, dtype=np.int8) for i, p in enumerate(self._populations)
    }

    for t in range(n_steps):
        pop_to_currents: dict[int, np.ndarray] = {
            idx: np.zeros(self._populations[idx].n, dtype=np.float64)
            for idx in self._local_indices
        }

        for proj in self._local_projs:
            src_idx = pop_id_to_idx[id(proj.source)]
            tgt_idx = pop_id_to_idx[id(proj.target)]
            src_sp = all_spikes.get(src_idx, np.zeros(proj.source.n, dtype=np.int8))
            current = proj.propagate(src_sp)
            if tgt_idx in pop_to_currents:
                pop_to_currents[tgt_idx] += current

        for src_idx, proj in self._cross_rank_projs:
            tgt_idx = pop_id_to_idx[id(proj.target)]
            src_sp = all_spikes.get(src_idx, np.zeros(proj.source.n, dtype=np.int8))
            current = proj.propagate(src_sp)
            if tgt_idx in pop_to_currents:
                pop_to_currents[tgt_idx] += current

        local_spikes = self._step_local(pop_to_currents, all_spikes)
        all_spikes = self._exchange_spikes(local_spikes)

        if self.rank == 0:
            net = self.network
            for mon in net.spike_monitors:
                idx = pop_id_to_idx.get(id(mon.population))
                if idx is not None and idx in all_spikes:
                    mon.record(all_spikes[idx], t)
            for mon in net.rate_monitors:  # type: ignore[assignment]
                idx = pop_id_to_idx.get(id(mon.population))
                if idx is not None and idx in all_spikes:
                    mon.record(all_spikes[idx], t, dt)  # type: ignore[call-arg]

random_connectivity(n_src, n_tgt, p, weight, seed=42)

Erdos-Renyi random connectivity.

Source code in src/sc_neurocore/network/topology.py
Python
61
62
63
64
65
66
67
def random_connectivity(n_src: int, n_tgt: int, p: float, weight: float, seed: int = 42) -> CSR:
    """Erdos-Renyi random connectivity."""
    rng = np.random.default_rng(seed)
    mask = rng.random((n_src, n_tgt)) < p
    rows, cols = np.nonzero(mask)
    weights = np.full(len(rows), weight, dtype=np.float64)
    return _to_csr(n_src, n_tgt, rows, cols, weights)

small_world(n, k, p_rewire, weight, seed=42)

Watts-Strogatz small-world graph (n-by-n adjacency).

Source code in src/sc_neurocore/network/topology.py
Python
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def small_world(n: int, k: int, p_rewire: float, weight: float, seed: int = 42) -> CSR:
    """Watts-Strogatz small-world graph (n-by-n adjacency)."""
    rng = np.random.default_rng(seed)
    half_k = k // 2
    row_list: list[int] = []
    col_list: list[int] = []
    for i in range(n):
        for j in range(1, half_k + 1):
            tgt = (i + j) % n
            if rng.random() < p_rewire:
                tgt = int(rng.integers(0, n))
                while tgt == i:
                    tgt = int(rng.integers(0, n))
            row_list.append(i)
            col_list.append(tgt)
            row_list.append(tgt)
            col_list.append(i)
    rows = np.array(row_list, dtype=np.int64)
    cols = np.array(col_list, dtype=np.int64)
    weights = np.full(len(rows), weight, dtype=np.float64)
    return _to_csr(n, n, rows, cols, weights)

scale_free(n, m, weight, seed=42)

Barabasi-Albert preferential attachment (n-by-n adjacency).

Source code in src/sc_neurocore/network/topology.py
Python
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def scale_free(n: int, m: int, weight: float, seed: int = 42) -> CSR:
    """Barabasi-Albert preferential attachment (n-by-n adjacency)."""
    rng = np.random.default_rng(seed)
    degree = np.zeros(n, dtype=np.float64)
    row_list: list[int] = []
    col_list: list[int] = []
    targets = list(range(m))
    for t in targets:
        degree[t] = 1.0
    for src in range(m, n):
        probs = degree[:src].copy()
        total = probs.sum()
        if total > 0:
            probs /= total
        else:
            probs[:] = 1.0 / src
        chosen = rng.choice(src, size=min(m, src), replace=False, p=probs)
        for tgt in chosen:
            row_list.append(src)
            col_list.append(int(tgt))
            row_list.append(int(tgt))
            col_list.append(src)
            degree[src] += 1
            degree[int(tgt)] += 1
    rows = np.array(row_list, dtype=np.int64)
    cols = np.array(col_list, dtype=np.int64)
    weights = np.full(len(rows), weight, dtype=np.float64)
    return _to_csr(n, n, rows, cols, weights)

ring_topology(n, k, weight)

Ring topology with k nearest neighbours in each direction.

Source code in src/sc_neurocore/network/topology.py
Python
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def ring_topology(n: int, k: int, weight: float) -> CSR:
    """Ring topology with k nearest neighbours in each direction."""
    row_list: list[int] = []
    col_list: list[int] = []
    for i in range(n):
        for j in range(1, k + 1):
            row_list.append(i)
            col_list.append((i + j) % n)
            row_list.append(i)
            col_list.append((i - j) % n)
    rows = np.array(row_list, dtype=np.int64)
    cols = np.array(col_list, dtype=np.int64)
    weights = np.full(len(rows), weight, dtype=np.float64)
    return _to_csr(n, n, rows, cols, weights)

grid_topology(rows_count, cols_count, radius, weight)

2D lattice connectivity within Manhattan radius.

Source code in src/sc_neurocore/network/topology.py
Python
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def grid_topology(rows_count: int, cols_count: int, radius: int, weight: float) -> CSR:
    """2D lattice connectivity within Manhattan radius."""
    n = rows_count * cols_count
    row_list: list[int] = []
    col_list: list[int] = []
    for r in range(rows_count):
        for c in range(cols_count):
            idx = r * cols_count + c
            for dr in range(-radius, radius + 1):
                for dc in range(-radius, radius + 1):
                    if dr == 0 and dc == 0:
                        continue
                    nr, nc = r + dr, c + dc
                    if 0 <= nr < rows_count and 0 <= nc < cols_count:
                        row_list.append(idx)
                        col_list.append(nr * cols_count + nc)
    r_arr = np.array(row_list, dtype=np.int64)
    c_arr = np.array(col_list, dtype=np.int64)
    weights = np.full(len(r_arr), weight, dtype=np.float64)
    return _to_csr(n, n, r_arr, c_arr, weights)

all_to_all(n_src, n_tgt, weight)

Full connectivity (every source to every target).

Source code in src/sc_neurocore/network/topology.py
Python
161
162
163
164
165
166
def all_to_all(n_src: int, n_tgt: int, weight: float) -> CSR:
    """Full connectivity (every source to every target)."""
    rows = np.repeat(np.arange(n_src, dtype=np.int64), n_tgt)
    cols = np.tile(np.arange(n_tgt, dtype=np.int64), n_src)
    weights = np.full(len(rows), weight, dtype=np.float64)
    return _to_csr(n_src, n_tgt, rows, cols, weights)

export_verilog(network, output_dir, target='ice40')

Export a LIF-based network to Verilog files.

Source code in src/sc_neurocore/network/export.py
Python
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def export_verilog(network: Network, output_dir: str, target: str = "ice40") -> str:
    """Export a LIF-based network to Verilog files."""
    _check_exportable(network)
    os.makedirs(output_dir, exist_ok=True)
    top_v = _emit_top(network, target)
    top_path = os.path.join(output_dir, "sc_network_top.v")
    with open(top_path, "w") as f:
        f.write(top_v)
    params_path = os.path.join(output_dir, "params.vh")
    with open(params_path, "w") as f:
        f.write(f"// SC-NeuroCore network parameters — target: {target}\n")
        for i, pop in enumerate(network.populations):
            f.write(f"`define POP_{i}_SIZE {pop.n}\n")
    return top_path