Skip to content

Neuromorphic Swarm Control

Module: sc_neurocore.swarm Source: src/sc_neurocore/swarm/ — 5 files, 1001 LOC Status (v3.14.0): all 9 public symbols wired; 73 tests pass across 3 test files; pure-Python (no Rust path) — performance falls steeply beyond ~100 agents (§9). The agent's "soft-LIF" naming overstates the mechanism; see §3.2.

This page covers the entire swarm subpackage: SwarmAgent (sigmoid-pseudo-spike SNN per agent), SwarmEnvironment (2-D arena with obstacles + targets), CollectiveFields (chemical / emotional / symbolic field layers), SwarmFitness (5 objectives plus a weighted composite), and SwarmEvolver (truncation-elite GA over flat weight vectors).


1. Public surface

sc_neurocore.swarm.__init__ re-exports 9 symbols from the 5 modules:

Symbol Source file Role
AgentConfig agent.py Hyper-parameters for one agent
SwarmAgent agent.py Per-agent SNN + kinematic state
EnvConfig swarm_env.py Arena hyper-parameters
SwarmEnvironment swarm_env.py 2-D continuous arena + step loop
FieldConfig collective_fields.py Field-layer hyper-parameters
CollectiveFields collective_fields.py Chemical / emotional / symbolic fields
SwarmFitness fitness.py Static objective + composite scoring
EvolverConfig neuroevolution_swarm.py GA hyper-parameters
SwarmEvolver neuroevolution_swarm.py GA over flat weight vectors

All 9 are dataclass-style or stateful classes; none are utility functions. The package has no module-level globals.


2. Architecture

Text Only
                ┌─────────────────────────────┐
                │ SwarmEvolver (GA, weights)  │
                │   pop_size × n_eval_steps   │
                └────────────┬────────────────┘
                             │ inject weights into every agent
                             ▼
   ┌────────────────────────────────────────────────────────┐
   │ SwarmEnvironment.step(dt, fields=None) — per timestep  │
   │                                                         │
   │   for each agent:                                       │
   │     build 20-channel sensory vector                     │
   │       (8 nbrs · 3 obs · 2 tgt · 2 chem · 2 sym · 2 emo) │
   │     speed, turn = agent.think(sensory)                  │
   │     agent.act(speed * dt, turn * dt)                    │
   │     boundary wrap / clamp                               │
   │     fields.deposit_chemical(agent.position, ...)        │
   │                                                         │
   │   target capture (Euclidean within capture_radius)      │
   │   fields.update(...) → diffuse, sync emotions, decay sym│
   └────────────────────────────────────────────────────────┘
                             │
                             ▼
                ┌─────────────────────────────┐
                │ SwarmFitness.composite(env) │
                │   0.30 cov + 0.20 coh +     │
                │   0.10 aln + 0.30 tgt -     │
                │   0.10 obstacle_penalty     │
                └─────────────────────────────┘

Agent has no awareness of fields directly; the environment passes chemical gradients, symbolic field reads, and the agent's own emotion vector into the sensory channel slice on each step. Weight vectors flow from SwarmEvolver into SwarmAgent.weights (setter). Fitness flows back from SwarmFitness.composite(env) into SwarmEvolver.


3. SwarmAgent and AgentConfig

Python
@dataclass
class AgentConfig:
    n_sensory: int = 20
    n_hidden: int = 16
    n_motor: int = 2
    membrane_decay: float = 0.9
    threshold: float = 1.0
    max_speed: float = 2.0
    seed: Optional[int] = None

Per-agent SNN dimensions (defaults): 20-channel sensory in, 16 hidden units, 2 motor channels (speed, turn). Total trainable weights per agent:

Text Only
n_weights = n_hidden * n_sensory     # W_in   = 16 × 20 = 320
          + n_hidden * n_hidden      # W_rec  = 16 × 16 = 256
          + n_motor  * n_hidden      # W_out  = 2  × 16 =  32
                                     # total = 608

Weights are initialised with Xavier-style scaling σ = sqrt(2 / (n_in + n_out)) per matrix.

The weights property exposes a flat 1-D vector of length n_weights for the GA; the setter splits it back into W_in, W_rec, W_out.

3.1 Sensory layout

The 20-channel sensory vector is filled by SwarmEnvironment.step:

Channels Source Range
0..7 nearest-neighbour distances (8 nearest) normalised by max(width, height)
8..10 nearest-obstacle surface distances (3 nearest) clipped to [-1, 1] after / 50
11..12 nearest-target distances (2 nearest) normalised by max(width, height)
13..14 chemical gradient (dx, dy) unit vector if fields present, zero otherwise
15..16 symbolic field at agent position 2-channel raw field value
17..18 agent's own emotion (valence, arousal) first 2 of 8-D emotion vector
19 the agent's own previous chemical output [0, 1]

If fields is None at step time, channels 13..19 are zero.

3.2 The "soft-LIF" naming overstates the mechanism

agent.py:53 describes SwarmAgent as "Spiking-neural-network agent with soft-LIF dynamics". The actual think() body (agent.py:130-166) does not implement an LIF cell. There is no hard threshold, no spike emission as a binary event, and no membrane reset. Instead:

Python
# Membrane integration (LIF-shaped, fine so far)
self.membrane = (
    c.membrane_decay * self.membrane
    + self.W_in @ inp
    + self.W_rec @ self.firing_rate
)

# This is NOT a spike — it is a sigmoid pseudo-rate
spike_prob = 1.0 / (1.0 + np.exp(-(self.membrane - c.threshold)))

# EMA over rate, not over spikes
self.firing_rate = 0.8 * self.firing_rate + 0.2 * spike_prob

# Soft "reset" — proportional, not the hard reset of LIF
self.membrane *= 1.0 - spike_prob

This is closer to a mean-field rate model with a sigmoid non-linearity than to LIF. There is no spike vector at any point; firing_rate is a smoothed sigmoid output. The motor readout W_out @ firing_rate then runs through tanh and a linear scale.

Why it matters: anyone expecting LIF dynamics — sparse binary spikes, hard threshold, integer spike counts — will not find them here. The fitness landscape under this surrogate may differ qualitatively from a true SNN. For a true LIF surrogate gradient, see sc_neurocore.neurons.stochastic_lif or the model registry.

A more honest name would be SoftSigmoidRecurrentNet or ReLU-ish-sigmoid SNN surrogate. Tracked as task #25 (rename or implement).

3.3 think and act

think(sensory) -> (speed, turn) runs one tick of the recurrent sigmoid network and returns motor commands: - speed = (tanh(W_out[0] @ firing_rate) + 1) * 0.5 * max_speed — clipped to [0, max_speed] - turn = tanh(W_out[1] @ firing_rate) * π — clipped to [-π, π]

act(speed, turn) advances the agent's position and heading:

Python
heading = (heading + turn) % (2π)
position += speed * (cos heading, sin heading)

reset(rng=None, width, height) zeroes the membrane / firing rate and re-randomises position and heading. Weights are not touched — useful for episodic resets within a training run.


4. SwarmEnvironment and EnvConfig

Python
@dataclass
class EnvConfig:
    width: float = 100.0
    height: float = 100.0
    n_agents: int = 20
    n_obstacles: int = 5
    n_targets: int = 3
    boundary_mode: str = "wrap"  # "wrap" or "clamp"
    capture_radius: float = 3.0
    respawn_targets: bool = True
    agent_config: Optional[AgentConfig] = None
    seed: Optional[int] = None

A 2-D continuous arena. Obstacles are circles (x, y, radius), targets are points (x, y). Boundary mode decides what happens at the edges (wrap-around toroidal vs hard clamp).

4.1 Neighbour / obstacle / target queries

Method Returns Cost
get_positions() (n_agents, 2) ndarray O(n)
get_headings() (n_agents,) ndarray O(n)
get_pairwise_distances() (n_agents, n_agents) Euclidean matrix O(n²)
get_neighbor_distances(i, k=8) sorted distances to k nearest others O(n log n) per agent
get_obstacle_distances(i, k=3) sorted surface distances to k nearest obstacles O(n_obs log n_obs)
get_target_distances(i, k=2) sorted distances to k nearest targets O(n_tgt log n_tgt)

The per-step cost is dominated by the per-agent neighbour query called inside the step loop: n × O(n log n) = O(n² log n). No KD-tree, no spatial hashing — see §9 for the resulting performance falloff.

4.2 step(dt, fields=None)

For each agent: 1. Build the 20-channel sensory vector (§3.1). 2. Call agent.think(sensory)(speed, turn). 3. Call agent.act(speed * dt, turn * dt). 4. Apply boundary mode (wrap or clamp). 5. If fields is not None, deposit agent.chemical_output * dt at the agent's position.

Then once per step: 6. Target capture — for each target, find the nearest agent distance; if < capture_radius, increment targets_captured and (optionally) respawn the target at a fresh random position. 7. fields.update(agents, env, dt) if fields are active.

step_count increments at the end of each call.

4.3 get_state()

Returns a JSON-serialisable snapshot:

Python
{"step": step_count,
 "positions": [[x, y], ...],
 "headings": [θ, ...],
 "obstacles": [[x, y, r], ...],
 "targets": [[x, y], ...],
 "targets_captured": int}

There is no inverse set_state — environments are reconstructed by re-seeding the same EnvConfig.


5. CollectiveFields and FieldConfig

Python
@dataclass
class FieldConfig:
    grid_size: int = 50
    diffusion_rate: float = 0.1
    decay_rate: float = 0.05
    emotional_coupling: float = 0.1
    symbolic_decay: float = 0.02
    seed: int | None = None

Three coupled field layers on a grid_size × grid_size lattice:

Field Storage Update
chemical_field (grid_size, grid_size) float Laplacian diffusion + exponential decay
emotional_field (n_agents, 8) float mean-field coupling toward swarm mean
symbolic_field (grid_size, grid_size, 2) float exponential decay, no diffusion

5.1 Chemical Laplacian diffusion

Text Only
∂C/∂t = D · ∇²C - λ · C

Implemented as _apply_laplacian (collective_fields.py:54) — a manual 3×3 stencil convolution with zero-padded edges, written as a double Python loop over (di, dj) ∈ {-1, 0, 1}² (Rustification candidate, see §10). After the Laplacian, the field is multiplied by (1 - decay_rate * dt) and clipped at 0 (no negative concentrations).

get_chemical_gradient(x, y) returns a unit-vector (dx, dy) of central-difference partials, mapped from grid to world coordinates. The norm includes a +1e-12 guard against zero-gradient division.

5.2 Emotional mean-field coupling

Python
mean = emotional_field.mean(axis=0)              # (8,)
emotional_field += coupling * (mean - emotional_field)

Each agent's 8-D emotion vector is pulled toward the swarm mean by a fraction coupling per call. Default coupling = 0.1, so the half-life of any deviation from the mean is log(2) / log(1 / 0.9) ≈ 6.6 steps.

5.3 Symbolic field

A 2-channel raster with no diffusion — only exponential decay (* (1 - symbolic_decay * dt)). Agents read with get_symbolic_at(x, y), write with deposit_symbolic(x, y, channel, amount). There is no spatial coupling beyond grid-cell granularity.


6. SwarmFitness

Static class with five per-objective scorers and one weighted composite. All inputs are NumPy arrays from SwarmEnvironment.

Method What it rewards Range
coverage_score(positions, area) fraction of a 10×10 bin grid that contains ≥1 agent [0, 1]
cohesion_score(positions) mean pairwise distance close to bbox-diagonal × 0.25 [0, 1] (Gaussian)
alignment_score(headings) Rayleigh statistic — mean resultant length of heading angles [0, 1]
target_score(positions, targets) inverse mean distance from each agent to its nearest target [0, 1] (1/(1+d/10))
obstacle_penalty(positions, obstacles) fraction of agents inside any obstacle [0, 1]

The composite at fitness.py:114-137:

Python
0.30 * coverage
+ 0.20 * cohesion
+ 0.10 * alignment
+ 0.30 * target_proximity
- 0.10 * obstacle_penalty

Weights are hard-coded; there is no composite(env, weights=...) overload. The score is unbounded below (because the obstacle penalty is subtracted) but bounded at +1.0 above when all positive objectives saturate and no agent is inside an obstacle.


7. SwarmEvolver and EvolverConfig

Python
@dataclass
class EvolverConfig:
    pop_size: int = 20
    n_elite: int = 4
    mutation_rate: float = 0.1
    mutation_std: float = 0.3
    n_eval_steps: int = 200
    use_fields: bool = False
    env_config: Optional[EnvConfig] = None
    agent_config: Optional[AgentConfig] = None
    seed: Optional[int] = None

A textbook genetic algorithm over flat weight vectors:

  1. Populationpop_size flat vectors, each of length n_weights = template_agent.n_weights (608 for the default AgentConfig). Initialised from N(0, 0.5).
  2. Evaluation — for each individual, build a fresh SwarmEnvironment, inject the same weights into all agents (homogeneous swarm), run n_eval_steps, score with SwarmFitness.composite(env).
  3. Selection — truncation: take the top n_elite individuals by fitness.
  4. Reproduction — elite survive unchanged; remainder produced by uniform crossover of two random elite parents plus Gaussian mutation (mutation_rate of genes perturbed by N(0, mutation_std)).
  5. Iterate for n_generations.

run(n_generations) -> list[best_fitness_per_generation]. get_best_weights() returns the highest-fitness vector after the last evaluation.

The evolver re-seeds the environment per individual via int(self.rng.integers(0, 2**31)), so each evaluation sees a different obstacle/target layout — fitness becomes an average over seeds within a generation. There is no fixed test-set evaluation.


8. Performance — measured (this workstation)

Hardware: Intel i5-11600K, 32 GB DDR4, Python 3.12.3.

8.1 SwarmEnvironment.step (100 timesteps, dt=1.0)

n_agents n_obstacles env-only wall env+fields wall env steps/s env+fields steps/s
20 5 380.2 ms 476.6 ms 263.0 209.8
100 5 1 815.8 ms 2 459.8 ms 55.1 40.7
500 5 17 347.7 ms 21 005.2 ms 5.8 4.8

Step rate falls super-linearly with n_agents (~5× drop per 5× population, suggesting the per-agent neighbour query dominates — n × O(n log n)O(n² log n)). The chemical-field Laplacian adds ~20 % overhead at small populations and shrinks in relative cost as the agent loop grows.

8.2 SwarmEvolver.evolve_generation (one generation, end-to-end)

pop_size n_eval_steps best_fitness wall
20 50 0.3300 3 022.8 ms
20 200 0.3063 11 845.1 ms
50 100 0.3524 18 783.4 ms

Per-generation cost is pop_size × n_eval_steps × per-agent-step, plus a fresh-environment construction overhead per individual. The "agent-step throughput" is consistent at ~270–340 agent-steps/s, matching §8.1.

8.3 No Rust path

agent.py, swarm_env.py, collective_fields.py, fitness.py, and neuroevolution_swarm.py are all pure Python / NumPy. The two hottest kernels — _apply_laplacian (Python double loop) and get_pairwise_distances (NumPy O(N²)) — are Rustification candidates. Tracked under task #13 (network/topology Rustification); the same effort would extend naturally to swarm.


9. Pipeline wiring

Surface How it's wired Verifier
from sc_neurocore.swarm import SwarmAgent, ... swarm/__init__.py:23-27 tests/test_swarm.py
SwarmEvolver injects weights into every agent evaluate_individual (neuroevolution_swarm.py:107) tests/test_swarm_control.py::test_evolver_*
SwarmEnvironment.step builds sensory vector + calls agent.think + boundary swarm_env.py:153 tests/test_swarm.py
CollectiveFields.update runs diffusion + emotion sync + symbolic decay collective_fields.py:195 tests/test_swarm_coverage.py
SwarmFitness.composite reads positions / headings / targets / obstacles fitness.py:114 tests/test_swarm_coverage.py

Every public symbol terminates in a tested call site; no orphan helpers.


10. Audit (7-point checklist)

# Dimension Status Detail
1 Pipeline wiring ✅ PASS All 9 symbols re-exported and used by the tests
2 Multi-angle tests ✅ PASS 73 tests across 3 files (test_swarm 52L, test_swarm_control 291L, test_swarm_coverage 184L)
3 Rust path ❌ FAIL Pure Python; _apply_laplacian and get_pairwise_distances dominate at n ≥ 100 (§8)
4 Benchmarks ✅ PASS §8.1 + §8.2 measured this session
5 Performance docs ✅ PASS §8
6 Documentation page ✅ PASS This page
7 Rules followed ⚠️ WARN SPDX header on every file ✅. "Soft-LIF" naming is misleading (§3.2) — the implementation is a sigmoid pseudo-rate model, not LIF. Three undocumented # type: ignore markers (agent.py:148, agent.py:153, swarm_env.py:153) without rationale. British English mostly clean (vectorise/synchronise consistent in docstrings).

Net: 1 WARN, 1 FAIL. The FAIL is performance-driven, not correctness-driven; the WARN is honesty-driven (the naming claim overstates what the code does).


11. Known issues

11.1 "Soft-LIF" naming overstates the mechanism

See §3.2. Either rename SwarmAgent to something that describes the sigmoid-pseudo-rate model honestly (e.g. SoftSigmoidRecurrentAgent) or replace think() with a real LIF (hard threshold + binary spike + hard reset, with surrogate gradient if differentiability is needed). Tracked as task #25.

11.2 No spatial index → O(n² log n) per step

get_neighbor_distances and get_obstacle_distances recompute the full pair-distance vector for every query. Adding a KD-tree (scipy.spatial.cKDTree) or a uniform spatial hash would drop the per-step cost from O(n² log n) to O(n × k) for k-nearest queries. At n=500 this would be ~20× faster. Tracked as task #26.

11.3 Three undocumented # type: ignore markers

  • agent.py:148# type: ignore[assignment] on the membrane integration line
  • agent.py:153 — same on the EMA firing-rate update
  • swarm_env.py:153# type: ignore[no-untyped-def] on step(self, dt, fields=None) (because fields is a forward reference)

Per the global "no # type: ignore without reason" rule, each needs either a one-line rationale comment or a properly typed alternative. The agent.py cases stem from the np.ndarray mutation pattern and can be fixed by typing self.membrane and self.firing_rate as np.ndarray at construction. The swarm_env.py case can use a TYPE_CHECKING import of CollectiveFields.

11.4 Composite fitness weights are hard-coded

SwarmFitness.composite does not accept user weights; the 0.30 / 0.20 / 0.10 / 0.30 / -0.10 mix is in the source. Useful for ranking within a single experiment, less so for multi-objective sweeps. Adding composite(env, weights: dict | None = None) would close the gap.

11.5 No fixed-seed evaluation in SwarmEvolver

Per §7, each individual sees a different environment seed. Fitness is therefore an average over seeds, not a deterministic test. To compare two runs you must either pin the evolver seed AND the generation count, or add a fixed-seed evaluation pass at the end.


12. Tests

Bash
PYTHONPATH=src python3 -m pytest \
    tests/test_swarm.py \
    tests/test_swarm_control.py \
    tests/test_swarm_coverage.py -q
# 73 passed in 6.32s (verified 2026-04-17)

What the existing tests cover:

  • test_swarm.py (3 test classes): basic agent construction, weight setter / getter round-trip, env step shape, evolver one-gen best-fitness > 0
  • test_swarm_control.py (291L, large): full GA loop, target capture, boundary modes (wrap + clamp), field deposit / decay, emotional sync, symbolic field, evolve_generation return type, evolver get_best_weights correctness, fitness composite weights
  • test_swarm_coverage.py (184L): individual fitness scorer edge cases (empty positions, single agent, all-same heading, etc.), symbolic field deposit + decay, obstacle penalty for agent on exact boundary

What is NOT tested:

  • Spatial-index correctness — there is no spatial index to test; task #26 would need new tests for cKDTree parity.
  • Long-horizon evolver convergence — every test uses ≤ 3 generations; nobody asserts that fitness improves over 100+ gens.
  • Field stability under high deposit rate — the chemical clip at 0 is tested, but not whether high-rate deposits ever overflow the float64 range. (Unlikely in practice, but undocumented.)

13. References

The swarm code does not cite specific publications, so the references below are the standard literature for each component:

  • Genetic algorithms over neural network weights — Stanley K. O., Miikkulainen R. "Evolving Neural Networks through Augmenting Topologies." Evolutionary Computation 10(2):99-127 (2002). NEAT, the canonical reference; the implementation here is much simpler (fixed topology, truncation selection, uniform crossover).
  • Sigmoid-pseudo-rate "soft-LIF" — closer to Wong K.-F., Wang X.-J. "A Recurrent Network Mechanism of Time Integration in Perceptual Decisions." J Neurosci 26(4):1314-1328 (2006), or any rate-coded RNN. For true LIF surrogate gradients see Neftci E., Mostafa H., Zenke F. "Surrogate Gradient Learning in Spiking Neural Networks." IEEE Signal Processing Magazine 36(6):51-63 (2019).
  • Chemical-field swarm communication — Bonabeau E., Dorigo M., Theraulaz G. Swarm Intelligence: From Natural to Artificial Systems (Oxford UP, 1999). The pheromone-deposit-and-diffuse pattern.
  • Rayleigh statistic for circular alignment — Mardia K. V., Jupp P. E. Directional Statistics (Wiley, 2000).

Internal:


14. Auto-rendered API

sc_neurocore.swarm

Neuromorphic Swarm Control

Spiking-neural-network agents with neuroevolution for collective behaviour.

Modules

agent SwarmAgent with soft-LIF SNN brain swarm_env Grid environment with obstacles and targets collective_fields Chemical, emotional, and symbolic field layers fitness Multi-objective swarm fitness evaluation neuroevolution_swarm Genetic algorithm over SNN weight vectors

AgentConfig dataclass

Hyper-parameters for a single swarm agent.

Source code in src/sc_neurocore/swarm/agent.py
Python
40
41
42
43
44
45
46
47
48
49
50
@dataclass
class AgentConfig:
    """Hyper-parameters for a single swarm agent."""

    n_sensory: int = 20
    n_hidden: int = 16
    n_motor: int = 2
    membrane_decay: float = 0.9
    threshold: float = 1.0
    max_speed: float = 2.0
    seed: Optional[int] = None

SwarmAgent

Spiking-neural-network agent with soft-LIF dynamics.

Parameters

cfg : AgentConfig Neuron and network parameters. agent_id : int Unique identifier within the swarm.

Source code in src/sc_neurocore/swarm/agent.py
Python
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class SwarmAgent:
    """Spiking-neural-network agent with soft-LIF dynamics.

    Parameters
    ----------
    cfg : AgentConfig
        Neuron and network parameters.
    agent_id : int
        Unique identifier within the swarm.
    """

    def __init__(self, cfg: AgentConfig, agent_id: int = 0) -> None:
        self.cfg = cfg
        self.agent_id = agent_id
        rng = np.random.default_rng(cfg.seed)

        # --- Weight matrices (Xavier-ish init) ---
        scale_in = np.sqrt(2.0 / (cfg.n_sensory + cfg.n_hidden))
        scale_rec = np.sqrt(2.0 / (cfg.n_hidden + cfg.n_hidden))
        scale_out = np.sqrt(2.0 / (cfg.n_hidden + cfg.n_motor))

        self.W_in = rng.normal(0, scale_in, (cfg.n_hidden, cfg.n_sensory))
        self.W_rec = rng.normal(0, scale_rec, (cfg.n_hidden, cfg.n_hidden))
        self.W_out = rng.normal(0, scale_out, (cfg.n_motor, cfg.n_hidden))

        # --- Neuron state ---
        self.membrane = np.zeros(cfg.n_hidden)
        self.firing_rate = np.zeros(cfg.n_hidden)

        # --- Kinematic state ---
        self.position = rng.uniform(0, 100, size=2).astype(np.float64)
        self.heading = rng.uniform(0, 2 * np.pi)

        # --- Emotional / chemical state ---
        self.emotions = np.zeros(8)
        self.chemical_output = 0.0

    # ------------------------------------------------------------------
    # Weight vector (flat) for genetic algorithm
    # ------------------------------------------------------------------

    @property
    def n_weights(self) -> int:
        c = self.cfg
        return c.n_hidden * c.n_sensory + c.n_hidden * c.n_hidden + c.n_motor * c.n_hidden

    @property
    def weights(self) -> np.ndarray[Any, Any]:
        """Return all trainable weights as a flat 1-D vector."""
        return np.concatenate(
            [
                self.W_in.ravel(),
                self.W_rec.ravel(),
                self.W_out.ravel(),
            ]
        )

    @weights.setter
    def weights(self, flat: np.ndarray[Any, Any]) -> None:
        c = self.cfg
        if flat.size != self.n_weights:
            raise ValueError(f"Expected {self.n_weights} weights, got {flat.size}")
        offset = 0
        size_in = c.n_hidden * c.n_sensory
        self.W_in = flat[offset : offset + size_in].reshape(c.n_hidden, c.n_sensory).copy()
        offset += size_in

        size_rec = c.n_hidden * c.n_hidden
        self.W_rec = flat[offset : offset + size_rec].reshape(c.n_hidden, c.n_hidden).copy()
        offset += size_rec

        size_out = c.n_motor * c.n_hidden
        self.W_out = flat[offset : offset + size_out].reshape(c.n_motor, c.n_hidden).copy()

    # ------------------------------------------------------------------
    # Neural forward pass (soft-LIF)
    # ------------------------------------------------------------------

    def think(self, sensory: np.ndarray[Any, Any]) -> tuple[float, float]:
        """Run one SNN tick and return ``(speed, turn_angle)``.

        Parameters
        ----------
        sensory : ndarray, shape (n_sensory,)
            Normalised sensory input vector.

        Returns
        -------
        speed : float  in [0, max_speed]
        turn  : float  in [-pi, pi]
        """
        c = self.cfg
        inp = np.asarray(sensory, dtype=np.float64).ravel()[: c.n_sensory]

        # Membrane integration
        self.membrane = (
            c.membrane_decay * self.membrane + self.W_in @ inp + self.W_rec @ self.firing_rate  # type: ignore[assignment]
        )

        # Soft spike (sigmoid pseudo-rate)
        spike_prob = 1.0 / (1.0 + np.exp(-(self.membrane - c.threshold)))
        self.firing_rate = 0.8 * self.firing_rate + 0.2 * spike_prob  # type: ignore[assignment]

        # Reset membrane where spike probability high
        self.membrane *= 1.0 - spike_prob

        # Motor readout
        motor = self.W_out @ self.firing_rate
        speed = (np.tanh(motor[0]) + 1.0) * 0.5 * c.max_speed  # [0, max_speed]
        turn = np.tanh(motor[1]) * np.pi  # [-pi, pi]

        # Side-effect: chemical output from last sensory channel
        self.chemical_output = float(np.clip(sensory[-1] if len(sensory) > 19 else 0.0, 0, 1))

        return float(speed), float(turn)

    # ------------------------------------------------------------------
    # Kinematic update
    # ------------------------------------------------------------------

    def act(self, speed: float, turn: float) -> None:
        """Update position and heading given motor commands."""
        self.heading = (self.heading + turn) % (2 * np.pi)
        dx = speed * np.cos(self.heading)
        dy = speed * np.sin(self.heading)
        self.position[0] += dx
        self.position[1] += dy

    # ------------------------------------------------------------------
    # Reset
    # ------------------------------------------------------------------

    def reset(
        self, rng: np.random.Generator | None = None, width: float = 100.0, height: float = 100.0
    ) -> None:
        """Reset kinematic and neural state (weights untouched)."""
        if rng is None:
            rng = np.random.default_rng()
        self.membrane[:] = 0.0
        self.firing_rate[:] = 0.0
        self.position = rng.uniform(0, [width, height]).astype(np.float64)
        self.heading = rng.uniform(0, 2 * np.pi)
        self.emotions[:] = 0.0
        self.chemical_output = 0.0

weights property writable

Return all trainable weights as a flat 1-D vector.

think(sensory)

Run one SNN tick and return (speed, turn_angle).

Parameters

sensory : ndarray, shape (n_sensory,) Normalised sensory input vector.

Returns

speed : float in [0, max_speed] turn : float in [-pi, pi]

Source code in src/sc_neurocore/swarm/agent.py
Python
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def think(self, sensory: np.ndarray[Any, Any]) -> tuple[float, float]:
    """Run one SNN tick and return ``(speed, turn_angle)``.

    Parameters
    ----------
    sensory : ndarray, shape (n_sensory,)
        Normalised sensory input vector.

    Returns
    -------
    speed : float  in [0, max_speed]
    turn  : float  in [-pi, pi]
    """
    c = self.cfg
    inp = np.asarray(sensory, dtype=np.float64).ravel()[: c.n_sensory]

    # Membrane integration
    self.membrane = (
        c.membrane_decay * self.membrane + self.W_in @ inp + self.W_rec @ self.firing_rate  # type: ignore[assignment]
    )

    # Soft spike (sigmoid pseudo-rate)
    spike_prob = 1.0 / (1.0 + np.exp(-(self.membrane - c.threshold)))
    self.firing_rate = 0.8 * self.firing_rate + 0.2 * spike_prob  # type: ignore[assignment]

    # Reset membrane where spike probability high
    self.membrane *= 1.0 - spike_prob

    # Motor readout
    motor = self.W_out @ self.firing_rate
    speed = (np.tanh(motor[0]) + 1.0) * 0.5 * c.max_speed  # [0, max_speed]
    turn = np.tanh(motor[1]) * np.pi  # [-pi, pi]

    # Side-effect: chemical output from last sensory channel
    self.chemical_output = float(np.clip(sensory[-1] if len(sensory) > 19 else 0.0, 0, 1))

    return float(speed), float(turn)

act(speed, turn)

Update position and heading given motor commands.

Source code in src/sc_neurocore/swarm/agent.py
Python
173
174
175
176
177
178
179
def act(self, speed: float, turn: float) -> None:
    """Update position and heading given motor commands."""
    self.heading = (self.heading + turn) % (2 * np.pi)
    dx = speed * np.cos(self.heading)
    dy = speed * np.sin(self.heading)
    self.position[0] += dx
    self.position[1] += dy

reset(rng=None, width=100.0, height=100.0)

Reset kinematic and neural state (weights untouched).

Source code in src/sc_neurocore/swarm/agent.py
Python
185
186
187
188
189
190
191
192
193
194
195
196
def reset(
    self, rng: np.random.Generator | None = None, width: float = 100.0, height: float = 100.0
) -> None:
    """Reset kinematic and neural state (weights untouched)."""
    if rng is None:
        rng = np.random.default_rng()
    self.membrane[:] = 0.0
    self.firing_rate[:] = 0.0
    self.position = rng.uniform(0, [width, height]).astype(np.float64)
    self.heading = rng.uniform(0, 2 * np.pi)
    self.emotions[:] = 0.0
    self.chemical_output = 0.0

EnvConfig dataclass

Environment hyper-parameters.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class EnvConfig:
    """Environment hyper-parameters."""

    width: float = 100.0
    height: float = 100.0
    n_agents: int = 20
    n_obstacles: int = 5
    n_targets: int = 3
    boundary_mode: str = "wrap"  # "wrap" or "clamp"
    capture_radius: float = 3.0
    respawn_targets: bool = True
    agent_config: Optional[AgentConfig] = None
    seed: Optional[int] = None

SwarmEnvironment

2-D continuous arena for swarm simulation.

Parameters

cfg : EnvConfig Environment configuration.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class SwarmEnvironment:
    """2-D continuous arena for swarm simulation.

    Parameters
    ----------
    cfg : EnvConfig
        Environment configuration.
    """

    def __init__(self, cfg: EnvConfig) -> None:
        self.cfg = cfg
        self.rng = np.random.default_rng(cfg.seed)
        agent_cfg = cfg.agent_config or AgentConfig()

        # --- Agents ---
        self.agents: list[SwarmAgent] = []
        for i in range(cfg.n_agents):
            a = SwarmAgent(agent_cfg, agent_id=i)
            a.position = self.rng.uniform(0, [cfg.width, cfg.height]).astype(np.float64)
            a.heading = self.rng.uniform(0, 2 * np.pi)
            self.agents.append(a)

        # --- Obstacles (x, y, radius) ---
        self.obstacles = np.zeros((cfg.n_obstacles, 3))
        for i in range(cfg.n_obstacles):
            self.obstacles[i, 0] = self.rng.uniform(10, cfg.width - 10)
            self.obstacles[i, 1] = self.rng.uniform(10, cfg.height - 10)
            self.obstacles[i, 2] = self.rng.uniform(2, 8)

        # --- Targets (x, y) ---
        self.targets = np.zeros((cfg.n_targets, 2))
        for i in range(cfg.n_targets):
            self.targets[i] = self._random_target_pos()

        self.targets_captured = 0
        self.step_count = 0

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _random_target_pos(self) -> np.ndarray[Any, Any]:
        return self.rng.uniform([5, 5], [self.cfg.width - 5, self.cfg.height - 5])

    def _apply_boundary(self, agent: SwarmAgent) -> None:
        if self.cfg.boundary_mode == "wrap":
            agent.position[0] %= self.cfg.width
            agent.position[1] %= self.cfg.height
        else:  # clamp
            agent.position[0] = np.clip(agent.position[0], 0, self.cfg.width)
            agent.position[1] = np.clip(agent.position[1], 0, self.cfg.height)

    # ------------------------------------------------------------------
    # Pairwise / neighbour queries
    # ------------------------------------------------------------------

    def get_positions(self) -> np.ndarray[Any, Any]:
        """Return (n_agents, 2) position array."""
        return np.array([a.position for a in self.agents])

    def get_headings(self) -> np.ndarray[Any, Any]:
        """Return (n_agents,) heading array."""
        return np.array([a.heading for a in self.agents])

    def get_pairwise_distances(self) -> np.ndarray[Any, Any]:
        """Return (n_agents, n_agents) Euclidean distance matrix."""
        pos = self.get_positions()
        diff = pos[:, np.newaxis, :] - pos[np.newaxis, :, :]
        return np.sqrt((diff**2).sum(axis=-1))

    def get_neighbor_distances(self, agent_idx: int, k: int = 8) -> np.ndarray[Any, Any]:
        """Return sorted distances to the *k* nearest neighbours.

        If fewer than *k* other agents exist the array is zero-padded.
        """
        pos = self.get_positions()
        diff = pos - pos[agent_idx]
        dists = np.sqrt((diff**2).sum(axis=-1))
        dists[agent_idx] = np.inf  # exclude self
        sorted_d = np.sort(dists)
        out = np.zeros(k)
        n = min(k, len(sorted_d) - 1)
        out[:n] = sorted_d[:n]
        return out

    def get_obstacle_distances(self, agent_idx: int, k: int = 3) -> np.ndarray[Any, Any]:
        """Distances to the *k* nearest obstacle surfaces (negative = inside)."""
        pos = self.agents[agent_idx].position
        centers = self.obstacles[:, :2]
        radii = self.obstacles[:, 2]
        dists = np.sqrt(((centers - pos) ** 2).sum(axis=-1)) - radii
        sorted_d = np.sort(dists)
        out = np.zeros(k)
        n = min(k, len(sorted_d))
        out[:n] = sorted_d[:n]
        return out

    def get_target_distances(self, agent_idx: int, k: int = 2) -> np.ndarray[Any, Any]:
        """Distances to the *k* nearest targets."""
        pos = self.agents[agent_idx].position
        dists = np.sqrt(((self.targets - pos) ** 2).sum(axis=-1))
        sorted_d = np.sort(dists)
        out = np.zeros(k)
        n = min(k, len(sorted_d))
        out[:n] = sorted_d[:n]
        return out

    # ------------------------------------------------------------------
    # Step
    # ------------------------------------------------------------------

    def step(self, dt: float = 1.0, fields=None) -> None:  # type: ignore[no-untyped-def]
        """Advance the simulation by one tick.

        Parameters
        ----------
        dt : float
            Timestep (used by collective fields diffusion).
        fields : CollectiveFields, optional
            If provided, agents read/write collective fields.
        """
        cfg = self.cfg
        for idx, agent in enumerate(self.agents):
            # Build 20-channel sensory vector
            sensory = np.zeros(agent.cfg.n_sensory)
            nbr_dist = self.get_neighbor_distances(idx, k=8)
            sensory[0:8] = np.clip(nbr_dist / max(cfg.width, cfg.height), 0, 1)
            od = self.get_obstacle_distances(idx, k=3)
            sensory[8:11] = np.clip(od / 50.0, -1, 1)
            td = self.get_target_distances(idx, k=2)
            sensory[11:13] = np.clip(td / max(cfg.width, cfg.height), 0, 1)

            if fields is not None:
                gx, gy = fields.get_chemical_gradient(agent.position[0], agent.position[1])
                sensory[13:15] = [gx, gy]
                sym = fields.get_symbolic_at(agent.position[0], agent.position[1])
                sensory[15:17] = sym
                sensory[17:19] = agent.emotions[:2]
                sensory[19] = agent.chemical_output
            # else: zeros (safe defaults)

            speed, turn = agent.think(sensory)
            agent.act(speed * dt, turn * dt)
            self._apply_boundary(agent)

            # Chemical deposit
            if fields is not None:
                fields.deposit_chemical(
                    agent.position[0], agent.position[1], agent.chemical_output * dt
                )

        # --- Target capture ---
        positions = self.get_positions()
        for t_idx in range(len(self.targets)):
            dists = np.sqrt(((positions - self.targets[t_idx]) ** 2).sum(axis=-1))
            if dists.min() < cfg.capture_radius:
                self.targets_captured += 1
                if cfg.respawn_targets:
                    self.targets[t_idx] = self._random_target_pos()

        # --- Update fields ---
        if fields is not None:
            fields.update(self.agents, self, dt)

        self.step_count += 1

    # ------------------------------------------------------------------
    # Serialisation
    # ------------------------------------------------------------------

    def get_state(self) -> dict[str, Any]:
        """Return a JSON-serialisable snapshot."""
        return {
            "step": self.step_count,
            "positions": self.get_positions().tolist(),
            "headings": self.get_headings().tolist(),
            "obstacles": self.obstacles.tolist(),
            "targets": self.targets.tolist(),
            "targets_captured": self.targets_captured,
        }

get_positions()

Return (n_agents, 2) position array.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
 99
100
101
def get_positions(self) -> np.ndarray[Any, Any]:
    """Return (n_agents, 2) position array."""
    return np.array([a.position for a in self.agents])

get_headings()

Return (n_agents,) heading array.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
103
104
105
def get_headings(self) -> np.ndarray[Any, Any]:
    """Return (n_agents,) heading array."""
    return np.array([a.heading for a in self.agents])

get_pairwise_distances()

Return (n_agents, n_agents) Euclidean distance matrix.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
107
108
109
110
111
def get_pairwise_distances(self) -> np.ndarray[Any, Any]:
    """Return (n_agents, n_agents) Euclidean distance matrix."""
    pos = self.get_positions()
    diff = pos[:, np.newaxis, :] - pos[np.newaxis, :, :]
    return np.sqrt((diff**2).sum(axis=-1))

get_neighbor_distances(agent_idx, k=8)

Return sorted distances to the k nearest neighbours.

If fewer than k other agents exist the array is zero-padded.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_neighbor_distances(self, agent_idx: int, k: int = 8) -> np.ndarray[Any, Any]:
    """Return sorted distances to the *k* nearest neighbours.

    If fewer than *k* other agents exist the array is zero-padded.
    """
    pos = self.get_positions()
    diff = pos - pos[agent_idx]
    dists = np.sqrt((diff**2).sum(axis=-1))
    dists[agent_idx] = np.inf  # exclude self
    sorted_d = np.sort(dists)
    out = np.zeros(k)
    n = min(k, len(sorted_d) - 1)
    out[:n] = sorted_d[:n]
    return out

get_obstacle_distances(agent_idx, k=3)

Distances to the k nearest obstacle surfaces (negative = inside).

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
128
129
130
131
132
133
134
135
136
137
138
def get_obstacle_distances(self, agent_idx: int, k: int = 3) -> np.ndarray[Any, Any]:
    """Distances to the *k* nearest obstacle surfaces (negative = inside)."""
    pos = self.agents[agent_idx].position
    centers = self.obstacles[:, :2]
    radii = self.obstacles[:, 2]
    dists = np.sqrt(((centers - pos) ** 2).sum(axis=-1)) - radii
    sorted_d = np.sort(dists)
    out = np.zeros(k)
    n = min(k, len(sorted_d))
    out[:n] = sorted_d[:n]
    return out

get_target_distances(agent_idx, k=2)

Distances to the k nearest targets.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
140
141
142
143
144
145
146
147
148
def get_target_distances(self, agent_idx: int, k: int = 2) -> np.ndarray[Any, Any]:
    """Distances to the *k* nearest targets."""
    pos = self.agents[agent_idx].position
    dists = np.sqrt(((self.targets - pos) ** 2).sum(axis=-1))
    sorted_d = np.sort(dists)
    out = np.zeros(k)
    n = min(k, len(sorted_d))
    out[:n] = sorted_d[:n]
    return out

step(dt=1.0, fields=None)

Advance the simulation by one tick.

Parameters

dt : float Timestep (used by collective fields diffusion). fields : CollectiveFields, optional If provided, agents read/write collective fields.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def step(self, dt: float = 1.0, fields=None) -> None:  # type: ignore[no-untyped-def]
    """Advance the simulation by one tick.

    Parameters
    ----------
    dt : float
        Timestep (used by collective fields diffusion).
    fields : CollectiveFields, optional
        If provided, agents read/write collective fields.
    """
    cfg = self.cfg
    for idx, agent in enumerate(self.agents):
        # Build 20-channel sensory vector
        sensory = np.zeros(agent.cfg.n_sensory)
        nbr_dist = self.get_neighbor_distances(idx, k=8)
        sensory[0:8] = np.clip(nbr_dist / max(cfg.width, cfg.height), 0, 1)
        od = self.get_obstacle_distances(idx, k=3)
        sensory[8:11] = np.clip(od / 50.0, -1, 1)
        td = self.get_target_distances(idx, k=2)
        sensory[11:13] = np.clip(td / max(cfg.width, cfg.height), 0, 1)

        if fields is not None:
            gx, gy = fields.get_chemical_gradient(agent.position[0], agent.position[1])
            sensory[13:15] = [gx, gy]
            sym = fields.get_symbolic_at(agent.position[0], agent.position[1])
            sensory[15:17] = sym
            sensory[17:19] = agent.emotions[:2]
            sensory[19] = agent.chemical_output
        # else: zeros (safe defaults)

        speed, turn = agent.think(sensory)
        agent.act(speed * dt, turn * dt)
        self._apply_boundary(agent)

        # Chemical deposit
        if fields is not None:
            fields.deposit_chemical(
                agent.position[0], agent.position[1], agent.chemical_output * dt
            )

    # --- Target capture ---
    positions = self.get_positions()
    for t_idx in range(len(self.targets)):
        dists = np.sqrt(((positions - self.targets[t_idx]) ** 2).sum(axis=-1))
        if dists.min() < cfg.capture_radius:
            self.targets_captured += 1
            if cfg.respawn_targets:
                self.targets[t_idx] = self._random_target_pos()

    # --- Update fields ---
    if fields is not None:
        fields.update(self.agents, self, dt)

    self.step_count += 1

get_state()

Return a JSON-serialisable snapshot.

Source code in src/sc_neurocore/swarm/swarm_env.py
Python
213
214
215
216
217
218
219
220
221
222
def get_state(self) -> dict[str, Any]:
    """Return a JSON-serialisable snapshot."""
    return {
        "step": self.step_count,
        "positions": self.get_positions().tolist(),
        "headings": self.get_headings().tolist(),
        "obstacles": self.obstacles.tolist(),
        "targets": self.targets.tolist(),
        "targets_captured": self.targets_captured,
    }

FieldConfig dataclass

Field layer hyper-parameters.

Source code in src/sc_neurocore/swarm/collective_fields.py
Python
32
33
34
35
36
37
38
39
40
41
@dataclass
class FieldConfig:
    """Field layer hyper-parameters."""

    grid_size: int = 50
    diffusion_rate: float = 0.1
    decay_rate: float = 0.05
    emotional_coupling: float = 0.1
    symbolic_decay: float = 0.02
    seed: int | None = None

CollectiveFields

Chemical, emotional, and symbolic field layers for swarm communication.

Parameters

cfg : FieldConfig Field configuration. env_width : float Physical width of the environment (for coordinate mapping). env_height : float Physical height of the environment. n_agents : int Number of agents (for emotional field sizing).

Source code in src/sc_neurocore/swarm/collective_fields.py
Python
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class CollectiveFields:
    """Chemical, emotional, and symbolic field layers for swarm communication.

    Parameters
    ----------
    cfg : FieldConfig
        Field configuration.
    env_width : float
        Physical width of the environment (for coordinate mapping).
    env_height : float
        Physical height of the environment.
    n_agents : int
        Number of agents (for emotional field sizing).
    """

    def __init__(
        self,
        cfg: FieldConfig,
        env_width: float = 100.0,
        env_height: float = 100.0,
        n_agents: int = 20,
    ) -> None:
        self.cfg = cfg
        self.env_width = env_width
        self.env_height = env_height
        self.n_agents = n_agents
        self.rng = np.random.default_rng(cfg.seed)

        gs = cfg.grid_size
        self.chemical_field = np.zeros((gs, gs), dtype=np.float64)
        self.emotional_field = np.zeros((n_agents, 8), dtype=np.float64)
        self.symbolic_field = np.zeros((gs, gs, 2), dtype=np.float64)

    # ------------------------------------------------------------------
    # Coordinate mapping: continuous (x, y) -> grid (row, col)
    # ------------------------------------------------------------------

    def _to_grid(self, x: float, y: float) -> tuple[int, int]:
        gs = self.cfg.grid_size
        col = int(np.clip(x / self.env_width * gs, 0, gs - 1))
        row = int(np.clip(y / self.env_height * gs, 0, gs - 1))
        return row, col

    # ------------------------------------------------------------------
    # Chemical field
    # ------------------------------------------------------------------

    def diffuse(self, dt: float) -> None:
        """Apply Laplacian diffusion + exponential decay to the chemical field."""
        lap = _apply_laplacian(self.chemical_field)
        self.chemical_field += self.cfg.diffusion_rate * dt * lap
        self.chemical_field *= 1.0 - self.cfg.decay_rate * dt
        np.clip(self.chemical_field, 0, None, out=self.chemical_field)

    def deposit_chemical(self, x: float, y: float, amount: float) -> None:
        """Add *amount* of chemical at world coordinate ``(x, y)``."""
        if amount <= 0:
            return
        r, c = self._to_grid(x, y)
        self.chemical_field[r, c] += amount

    def get_chemical_gradient(self, x: float, y: float) -> tuple[float, float]:
        """Return normalised (dx, dy) chemical gradient at ``(x, y)``.

        Uses central differences on the grid, mapped back to world coords.
        """
        r, c = self._to_grid(x, y)
        gs = self.cfg.grid_size
        f = self.chemical_field

        # Central differences with boundary clamp
        dc = (f[r, min(c + 1, gs - 1)] - f[r, max(c - 1, 0)]) * 0.5
        dr = (f[min(r + 1, gs - 1), c] - f[max(r - 1, 0), c]) * 0.5

        # Map grid gradient -> world gradient direction
        dx = float(dc)
        dy = float(dr)
        norm = np.sqrt(dx * dx + dy * dy) + 1e-12
        return dx / norm, dy / norm

    # ------------------------------------------------------------------
    # Emotional field
    # ------------------------------------------------------------------

    def synchronize_emotions(self, coupling: float | None = None) -> None:
        """Pull each agent's emotional vector toward the swarm mean."""
        if coupling is None:
            coupling = self.cfg.emotional_coupling
        mean_emotion = self.emotional_field.mean(axis=0)
        self.emotional_field += coupling * (mean_emotion - self.emotional_field)

    # ------------------------------------------------------------------
    # Symbolic field
    # ------------------------------------------------------------------

    def get_symbolic_at(self, x: float, y: float) -> np.ndarray[Any, Any]:
        """Return the 2-channel symbolic vector at ``(x, y)``."""
        r, c = self._to_grid(x, y)
        return self.symbolic_field[r, c].copy()

    def deposit_symbolic(self, x: float, y: float, channel: int, amount: float) -> None:
        """Deposit into a symbolic channel at ``(x, y)``."""
        r, c = self._to_grid(x, y)
        self.symbolic_field[r, c, channel] += amount

    # ------------------------------------------------------------------
    # Orchestration
    # ------------------------------------------------------------------

    def update(self, agents: list[SwarmAgent], env: SwarmEnvironment, dt: float) -> None:
        """Run one collective-field tick.

        1. Diffuse and decay chemical field.
        2. Synchronise emotional field.
        3. Decay symbolic field.
        4. Copy agent emotions into / out of emotional field.
        """
        # Push agent emotions into the field
        for idx, agent in enumerate(agents):
            if idx < self.n_agents:
                self.emotional_field[idx] = agent.emotions

        self.diffuse(dt)
        self.synchronize_emotions()

        # Symbolic decay
        self.symbolic_field *= 1.0 - self.cfg.symbolic_decay * dt

        # Pull updated emotions back to agents
        for idx, agent in enumerate(agents):
            if idx < self.n_agents:
                agent.emotions = self.emotional_field[idx].copy()

diffuse(dt)

Apply Laplacian diffusion + exponential decay to the chemical field.

Source code in src/sc_neurocore/swarm/collective_fields.py
Python
134
135
136
137
138
139
def diffuse(self, dt: float) -> None:
    """Apply Laplacian diffusion + exponential decay to the chemical field."""
    lap = _apply_laplacian(self.chemical_field)
    self.chemical_field += self.cfg.diffusion_rate * dt * lap
    self.chemical_field *= 1.0 - self.cfg.decay_rate * dt
    np.clip(self.chemical_field, 0, None, out=self.chemical_field)

deposit_chemical(x, y, amount)

Add amount of chemical at world coordinate (x, y).

Source code in src/sc_neurocore/swarm/collective_fields.py
Python
141
142
143
144
145
146
def deposit_chemical(self, x: float, y: float, amount: float) -> None:
    """Add *amount* of chemical at world coordinate ``(x, y)``."""
    if amount <= 0:
        return
    r, c = self._to_grid(x, y)
    self.chemical_field[r, c] += amount

get_chemical_gradient(x, y)

Return normalised (dx, dy) chemical gradient at (x, y).

Uses central differences on the grid, mapped back to world coords.

Source code in src/sc_neurocore/swarm/collective_fields.py
Python
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def get_chemical_gradient(self, x: float, y: float) -> tuple[float, float]:
    """Return normalised (dx, dy) chemical gradient at ``(x, y)``.

    Uses central differences on the grid, mapped back to world coords.
    """
    r, c = self._to_grid(x, y)
    gs = self.cfg.grid_size
    f = self.chemical_field

    # Central differences with boundary clamp
    dc = (f[r, min(c + 1, gs - 1)] - f[r, max(c - 1, 0)]) * 0.5
    dr = (f[min(r + 1, gs - 1), c] - f[max(r - 1, 0), c]) * 0.5

    # Map grid gradient -> world gradient direction
    dx = float(dc)
    dy = float(dr)
    norm = np.sqrt(dx * dx + dy * dy) + 1e-12
    return dx / norm, dy / norm

synchronize_emotions(coupling=None)

Pull each agent's emotional vector toward the swarm mean.

Source code in src/sc_neurocore/swarm/collective_fields.py
Python
171
172
173
174
175
176
def synchronize_emotions(self, coupling: float | None = None) -> None:
    """Pull each agent's emotional vector toward the swarm mean."""
    if coupling is None:
        coupling = self.cfg.emotional_coupling
    mean_emotion = self.emotional_field.mean(axis=0)
    self.emotional_field += coupling * (mean_emotion - self.emotional_field)

get_symbolic_at(x, y)

Return the 2-channel symbolic vector at (x, y).

Source code in src/sc_neurocore/swarm/collective_fields.py
Python
182
183
184
185
def get_symbolic_at(self, x: float, y: float) -> np.ndarray[Any, Any]:
    """Return the 2-channel symbolic vector at ``(x, y)``."""
    r, c = self._to_grid(x, y)
    return self.symbolic_field[r, c].copy()

deposit_symbolic(x, y, channel, amount)

Deposit into a symbolic channel at (x, y).

Source code in src/sc_neurocore/swarm/collective_fields.py
Python
187
188
189
190
def deposit_symbolic(self, x: float, y: float, channel: int, amount: float) -> None:
    """Deposit into a symbolic channel at ``(x, y)``."""
    r, c = self._to_grid(x, y)
    self.symbolic_field[r, c, channel] += amount

update(agents, env, dt)

Run one collective-field tick.

  1. Diffuse and decay chemical field.
  2. Synchronise emotional field.
  3. Decay symbolic field.
  4. Copy agent emotions into / out of emotional field.
Source code in src/sc_neurocore/swarm/collective_fields.py
Python
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def update(self, agents: list[SwarmAgent], env: SwarmEnvironment, dt: float) -> None:
    """Run one collective-field tick.

    1. Diffuse and decay chemical field.
    2. Synchronise emotional field.
    3. Decay symbolic field.
    4. Copy agent emotions into / out of emotional field.
    """
    # Push agent emotions into the field
    for idx, agent in enumerate(agents):
        if idx < self.n_agents:
            self.emotional_field[idx] = agent.emotions

    self.diffuse(dt)
    self.synchronize_emotions()

    # Symbolic decay
    self.symbolic_field *= 1.0 - self.cfg.symbolic_decay * dt

    # Pull updated emotions back to agents
    for idx, agent in enumerate(agents):
        if idx < self.n_agents:
            agent.emotions = self.emotional_field[idx].copy()

SwarmFitness

Static fitness functions for swarm evaluation.

Source code in src/sc_neurocore/swarm/fitness.py
Python
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class SwarmFitness:
    """Static fitness functions for swarm evaluation."""

    # ------------------------------------------------------------------
    # Individual objectives
    # ------------------------------------------------------------------

    @staticmethod
    def coverage_score(positions: np.ndarray[Any, Any], area: tuple[float, float]) -> float:
        """Fraction of the arena covered by the swarm.

        Divides the arena into a 10x10 grid and counts the fraction of
        cells that contain at least one agent.
        """
        grid_n = 10
        w, h = area
        cols = np.clip((positions[:, 0] / w * grid_n).astype(int), 0, grid_n - 1)
        rows = np.clip((positions[:, 1] / h * grid_n).astype(int), 0, grid_n - 1)
        occupied = set(zip(rows.tolist(), cols.tolist()))
        return len(occupied) / (grid_n * grid_n)

    @staticmethod
    def cohesion_score(positions: np.ndarray[Any, Any]) -> float:
        """Reward moderate inter-agent distance (not too spread, not too clumped).

        Returns a value in [0, 1] peaking when the mean pairwise distance
        equals one-quarter of the bounding-box diagonal.
        """
        if len(positions) < 2:
            return 0.0
        diff = positions[:, np.newaxis, :] - positions[np.newaxis, :, :]
        dists = np.sqrt((diff**2).sum(axis=-1))
        # Upper triangle only
        triu_idx = np.triu_indices(len(positions), k=1)
        mean_dist = dists[triu_idx].mean()
        x, y = positions[:, 0], positions[:, 1]
        bbox_diag = np.sqrt((x.max() - x.min()) ** 2 + (y.max() - y.min()) ** 2) + 1e-12
        ideal = bbox_diag * 0.25
        return float(np.exp(-(((mean_dist - ideal) / ideal) ** 2)))

    @staticmethod
    def alignment_score(headings: np.ndarray[Any, Any]) -> float:
        """Mean resultant length of heading angles (Rayleigh statistic).

        Returns 1.0 when all agents face the same direction, 0.0 when
        headings are uniformly distributed.
        """
        if len(headings) == 0:
            return 0.0
        cx = np.cos(headings).mean()
        cy = np.sin(headings).mean()
        return float(np.sqrt(cx**2 + cy**2))

    @staticmethod
    def target_score(positions: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
        """Proximity reward: inverse mean distance to nearest target per agent.

        Normalised to [0, 1] via ``1 / (1 + mean_dist / 10)``.
        """
        if len(targets) == 0:
            return 0.0
        # (n_agents, n_targets)
        diff = positions[:, np.newaxis, :] - targets[np.newaxis, :, :]
        dists = np.sqrt((diff**2).sum(axis=-1))
        nearest = dists.min(axis=1)
        mean_nearest = nearest.mean()
        return float(1.0 / (1.0 + mean_nearest / 10.0))

    @staticmethod
    def obstacle_penalty(positions: np.ndarray[Any, Any], obstacles: np.ndarray[Any, Any]) -> float:
        """Fraction of agents inside any obstacle (surface penetration)."""
        if len(obstacles) == 0:
            return 0.0
        centers = obstacles[:, :2]
        radii = obstacles[:, 2]
        # (n_agents, n_obstacles)
        diff = positions[:, np.newaxis, :] - centers[np.newaxis, :, :]
        dists = np.sqrt((diff**2).sum(axis=-1))
        inside = (dists < radii[np.newaxis, :]).any(axis=1)
        return float(inside.mean())

    # ------------------------------------------------------------------
    # Composite
    # ------------------------------------------------------------------

    @staticmethod
    def composite(env: SwarmEnvironment) -> float:
        """Weighted sum of all objectives.

        Weights::

            0.30 * coverage
          + 0.20 * cohesion
          + 0.10 * alignment
          + 0.30 * target
          - 0.10 * obstacle_penalty

        Returns a scalar (higher is better).
        """
        positions = env.get_positions()
        headings = env.get_headings()
        area = (env.cfg.width, env.cfg.height)

        cov = SwarmFitness.coverage_score(positions, area)
        coh = SwarmFitness.cohesion_score(positions)
        aln = SwarmFitness.alignment_score(headings)
        tgt = SwarmFitness.target_score(positions, env.targets)
        obs = SwarmFitness.obstacle_penalty(positions, env.obstacles)

        return 0.30 * cov + 0.20 * coh + 0.10 * aln + 0.30 * tgt - 0.10 * obs

coverage_score(positions, area) staticmethod

Fraction of the arena covered by the swarm.

Divides the arena into a 10x10 grid and counts the fraction of cells that contain at least one agent.

Source code in src/sc_neurocore/swarm/fitness.py
Python
36
37
38
39
40
41
42
43
44
45
46
47
48
@staticmethod
def coverage_score(positions: np.ndarray[Any, Any], area: tuple[float, float]) -> float:
    """Fraction of the arena covered by the swarm.

    Divides the arena into a 10x10 grid and counts the fraction of
    cells that contain at least one agent.
    """
    grid_n = 10
    w, h = area
    cols = np.clip((positions[:, 0] / w * grid_n).astype(int), 0, grid_n - 1)
    rows = np.clip((positions[:, 1] / h * grid_n).astype(int), 0, grid_n - 1)
    occupied = set(zip(rows.tolist(), cols.tolist()))
    return len(occupied) / (grid_n * grid_n)

cohesion_score(positions) staticmethod

Reward moderate inter-agent distance (not too spread, not too clumped).

Returns a value in [0, 1] peaking when the mean pairwise distance equals one-quarter of the bounding-box diagonal.

Source code in src/sc_neurocore/swarm/fitness.py
Python
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@staticmethod
def cohesion_score(positions: np.ndarray[Any, Any]) -> float:
    """Reward moderate inter-agent distance (not too spread, not too clumped).

    Returns a value in [0, 1] peaking when the mean pairwise distance
    equals one-quarter of the bounding-box diagonal.
    """
    if len(positions) < 2:
        return 0.0
    diff = positions[:, np.newaxis, :] - positions[np.newaxis, :, :]
    dists = np.sqrt((diff**2).sum(axis=-1))
    # Upper triangle only
    triu_idx = np.triu_indices(len(positions), k=1)
    mean_dist = dists[triu_idx].mean()
    x, y = positions[:, 0], positions[:, 1]
    bbox_diag = np.sqrt((x.max() - x.min()) ** 2 + (y.max() - y.min()) ** 2) + 1e-12
    ideal = bbox_diag * 0.25
    return float(np.exp(-(((mean_dist - ideal) / ideal) ** 2)))

alignment_score(headings) staticmethod

Mean resultant length of heading angles (Rayleigh statistic).

Returns 1.0 when all agents face the same direction, 0.0 when headings are uniformly distributed.

Source code in src/sc_neurocore/swarm/fitness.py
Python
69
70
71
72
73
74
75
76
77
78
79
80
@staticmethod
def alignment_score(headings: np.ndarray[Any, Any]) -> float:
    """Mean resultant length of heading angles (Rayleigh statistic).

    Returns 1.0 when all agents face the same direction, 0.0 when
    headings are uniformly distributed.
    """
    if len(headings) == 0:
        return 0.0
    cx = np.cos(headings).mean()
    cy = np.sin(headings).mean()
    return float(np.sqrt(cx**2 + cy**2))

target_score(positions, targets) staticmethod

Proximity reward: inverse mean distance to nearest target per agent.

Normalised to [0, 1] via 1 / (1 + mean_dist / 10).

Source code in src/sc_neurocore/swarm/fitness.py
Python
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@staticmethod
def target_score(positions: np.ndarray[Any, Any], targets: np.ndarray[Any, Any]) -> float:
    """Proximity reward: inverse mean distance to nearest target per agent.

    Normalised to [0, 1] via ``1 / (1 + mean_dist / 10)``.
    """
    if len(targets) == 0:
        return 0.0
    # (n_agents, n_targets)
    diff = positions[:, np.newaxis, :] - targets[np.newaxis, :, :]
    dists = np.sqrt((diff**2).sum(axis=-1))
    nearest = dists.min(axis=1)
    mean_nearest = nearest.mean()
    return float(1.0 / (1.0 + mean_nearest / 10.0))

obstacle_penalty(positions, obstacles) staticmethod

Fraction of agents inside any obstacle (surface penetration).

Source code in src/sc_neurocore/swarm/fitness.py
Python
 97
 98
 99
100
101
102
103
104
105
106
107
108
@staticmethod
def obstacle_penalty(positions: np.ndarray[Any, Any], obstacles: np.ndarray[Any, Any]) -> float:
    """Fraction of agents inside any obstacle (surface penetration)."""
    if len(obstacles) == 0:
        return 0.0
    centers = obstacles[:, :2]
    radii = obstacles[:, 2]
    # (n_agents, n_obstacles)
    diff = positions[:, np.newaxis, :] - centers[np.newaxis, :, :]
    dists = np.sqrt((diff**2).sum(axis=-1))
    inside = (dists < radii[np.newaxis, :]).any(axis=1)
    return float(inside.mean())

composite(env) staticmethod

Weighted sum of all objectives.

Weights::

Text Only
0.30 * coverage
  • 0.20 * cohesion
  • 0.10 * alignment
  • 0.30 * target
  • 0.10 * obstacle_penalty

Returns a scalar (higher is better).

Source code in src/sc_neurocore/swarm/fitness.py
Python
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@staticmethod
def composite(env: SwarmEnvironment) -> float:
    """Weighted sum of all objectives.

    Weights::

        0.30 * coverage
      + 0.20 * cohesion
      + 0.10 * alignment
      + 0.30 * target
      - 0.10 * obstacle_penalty

    Returns a scalar (higher is better).
    """
    positions = env.get_positions()
    headings = env.get_headings()
    area = (env.cfg.width, env.cfg.height)

    cov = SwarmFitness.coverage_score(positions, area)
    coh = SwarmFitness.cohesion_score(positions)
    aln = SwarmFitness.alignment_score(headings)
    tgt = SwarmFitness.target_score(positions, env.targets)
    obs = SwarmFitness.obstacle_penalty(positions, env.obstacles)

    return 0.30 * cov + 0.20 * coh + 0.10 * aln + 0.30 * tgt - 0.10 * obs

EvolverConfig dataclass

Neuroevolution hyper-parameters.

Source code in src/sc_neurocore/swarm/neuroevolution_swarm.py
Python
35
36
37
38
39
40
41
42
43
44
45
46
47
@dataclass
class EvolverConfig:
    """Neuroevolution hyper-parameters."""

    pop_size: int = 20
    n_elite: int = 4
    mutation_rate: float = 0.1
    mutation_std: float = 0.3
    n_eval_steps: int = 200
    use_fields: bool = False
    env_config: Optional[EnvConfig] = None
    agent_config: Optional[AgentConfig] = None
    seed: Optional[int] = None

SwarmEvolver

Genetic algorithm that evolves SNN weights for swarm control.

Parameters

cfg : EvolverConfig Evolution and evaluation parameters.

Source code in src/sc_neurocore/swarm/neuroevolution_swarm.py
Python
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class SwarmEvolver:
    """Genetic algorithm that evolves SNN weights for swarm control.

    Parameters
    ----------
    cfg : EvolverConfig
        Evolution and evaluation parameters.
    """

    def __init__(self, cfg: EvolverConfig) -> None:
        self.cfg = cfg
        self.rng = np.random.default_rng(cfg.seed)
        self.agent_config = cfg.agent_config or AgentConfig()

        # Determine weight vector size from a template agent
        template = SwarmAgent(self.agent_config, agent_id=0)
        self.n_weights = template.n_weights

        # Initialise population with small random weights
        self.population = [self.rng.normal(0, 0.5, self.n_weights) for _ in range(cfg.pop_size)]
        self.fitnesses = np.zeros(cfg.pop_size)
        self.generation = 0
        self.best_fitness_history: list[float] = []

    # ------------------------------------------------------------------
    # Evaluation
    # ------------------------------------------------------------------

    def _make_env(self) -> SwarmEnvironment:
        """Build a fresh environment with the correct agent config."""
        env_cfg = self.cfg.env_config or EnvConfig()
        # Ensure the environment uses our agent_config so weight sizes match
        env_cfg = EnvConfig(
            width=env_cfg.width,
            height=env_cfg.height,
            n_agents=env_cfg.n_agents,
            n_obstacles=env_cfg.n_obstacles,
            n_targets=env_cfg.n_targets,
            boundary_mode=env_cfg.boundary_mode,
            capture_radius=env_cfg.capture_radius,
            respawn_targets=env_cfg.respawn_targets,
            agent_config=self.agent_config,
            seed=int(self.rng.integers(0, 2**31)),
        )
        return SwarmEnvironment(env_cfg)

    def evaluate_individual(self, weights: np.ndarray[Any, Any]) -> float:
        """Create environment, inject *weights* into every agent, run, score.

        Parameters
        ----------
        weights : ndarray, shape (n_weights,)

        Returns
        -------
        fitness : float
        """
        env = self._make_env()

        # Inject same weights into all agents (homogeneous swarm)
        for agent in env.agents:
            agent.weights = weights

        fields: CollectiveFields | None = None
        if self.cfg.use_fields:
            fields = CollectiveFields(
                FieldConfig(),
                env_width=env.cfg.width,
                env_height=env.cfg.height,
                n_agents=env.cfg.n_agents,
            )

        for _ in range(self.cfg.n_eval_steps):
            env.step(dt=1.0, fields=fields)

        return SwarmFitness.composite(env)

    # ------------------------------------------------------------------
    # Selection & reproduction
    # ------------------------------------------------------------------

    def _select_elite(self) -> list[np.ndarray[Any, Any]]:
        """Return the top-N weight vectors by fitness."""
        order = np.argsort(self.fitnesses)[::-1]
        return [self.population[i].copy() for i in order[: self.cfg.n_elite]]

    def _crossover(
        self, parent_a: np.ndarray[Any, Any], parent_b: np.ndarray[Any, Any]
    ) -> np.ndarray[Any, Any]:
        """Uniform crossover: each gene randomly from either parent."""
        mask = self.rng.random(self.n_weights) < 0.5
        child = np.where(mask, parent_a, parent_b)
        return child

    def _mutate(self, individual: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
        """Gaussian mutation applied to a random subset of genes."""
        mask = self.rng.random(self.n_weights) < self.cfg.mutation_rate
        noise = self.rng.normal(0, self.cfg.mutation_std, self.n_weights)
        individual[mask] += noise[mask]
        return individual

    # ------------------------------------------------------------------
    # Evolution
    # ------------------------------------------------------------------

    def evolve_generation(self) -> float:
        """Evaluate population, select, reproduce.  Return best fitness."""
        # Evaluate
        for i, w in enumerate(self.population):
            self.fitnesses[i] = self.evaluate_individual(w)

        best = float(self.fitnesses.max())
        self.best_fitness_history.append(best)

        # Select elite
        elite = self._select_elite()

        # Build next generation
        new_pop: list[np.ndarray[Any, Any]] = list(elite)  # elite survive unchanged
        while len(new_pop) < self.cfg.pop_size:
            pa = elite[self.rng.integers(0, len(elite))]
            pb = elite[self.rng.integers(0, len(elite))]
            child = self._crossover(pa, pb)
            child = self._mutate(child)
            new_pop.append(child)

        self.population = new_pop
        self.generation += 1
        return best

    # ------------------------------------------------------------------
    # Convenience
    # ------------------------------------------------------------------

    def get_best_weights(self) -> np.ndarray[Any, Any]:
        """Return the weight vector with the highest fitness."""
        idx = int(np.argmax(self.fitnesses))
        return self.population[idx].copy()

    def run(self, n_generations: int) -> list[float]:
        """Run *n_generations* of evolution.  Return list of best fitnesses."""
        for _ in range(n_generations):
            self.evolve_generation()
        return list(self.best_fitness_history)

evaluate_individual(weights)

Create environment, inject weights into every agent, run, score.

Parameters

weights : ndarray, shape (n_weights,)

Returns

fitness : float

Source code in src/sc_neurocore/swarm/neuroevolution_swarm.py
Python
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def evaluate_individual(self, weights: np.ndarray[Any, Any]) -> float:
    """Create environment, inject *weights* into every agent, run, score.

    Parameters
    ----------
    weights : ndarray, shape (n_weights,)

    Returns
    -------
    fitness : float
    """
    env = self._make_env()

    # Inject same weights into all agents (homogeneous swarm)
    for agent in env.agents:
        agent.weights = weights

    fields: CollectiveFields | None = None
    if self.cfg.use_fields:
        fields = CollectiveFields(
            FieldConfig(),
            env_width=env.cfg.width,
            env_height=env.cfg.height,
            n_agents=env.cfg.n_agents,
        )

    for _ in range(self.cfg.n_eval_steps):
        env.step(dt=1.0, fields=fields)

    return SwarmFitness.composite(env)

evolve_generation()

Evaluate population, select, reproduce. Return best fitness.

Source code in src/sc_neurocore/swarm/neuroevolution_swarm.py
Python
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def evolve_generation(self) -> float:
    """Evaluate population, select, reproduce.  Return best fitness."""
    # Evaluate
    for i, w in enumerate(self.population):
        self.fitnesses[i] = self.evaluate_individual(w)

    best = float(self.fitnesses.max())
    self.best_fitness_history.append(best)

    # Select elite
    elite = self._select_elite()

    # Build next generation
    new_pop: list[np.ndarray[Any, Any]] = list(elite)  # elite survive unchanged
    while len(new_pop) < self.cfg.pop_size:
        pa = elite[self.rng.integers(0, len(elite))]
        pb = elite[self.rng.integers(0, len(elite))]
        child = self._crossover(pa, pb)
        child = self._mutate(child)
        new_pop.append(child)

    self.population = new_pop
    self.generation += 1
    return best

get_best_weights()

Return the weight vector with the highest fitness.

Source code in src/sc_neurocore/swarm/neuroevolution_swarm.py
Python
184
185
186
187
def get_best_weights(self) -> np.ndarray[Any, Any]:
    """Return the weight vector with the highest fitness."""
    idx = int(np.argmax(self.fitnesses))
    return self.population[idx].copy()

run(n_generations)

Run n_generations of evolution. Return list of best fitnesses.

Source code in src/sc_neurocore/swarm/neuroevolution_swarm.py
Python
189
190
191
192
193
def run(self, n_generations: int) -> list[float]:
    """Run *n_generations* of evolution.  Return list of best fitnesses."""
    for _ in range(n_generations):
        self.evolve_generation()
    return list(self.best_fitness_history)