Skip to content

Spike-Level Debugger + HIL Telemetry

Three complementary debug surfaces for stochastic-computing and spiking neural networks: an offline spike tracer + analyser for post-mortem divergence and causality analysis, a live bitstream oscilloscope (sc_scope) that computes per-layer density / effective-bits / SCC on a live FPGA stream, an adaptive SC doctor (sc_doctor) that auto-tunes bitstream length on-the-fly and adds Hamming(7,4) ECC when needed, and a Hardware-in-the-Loop telemetry daemon that multiplexes protobuf HILFrame messages onto a WebSocket for GUI/CI consumers.

Python
from sc_neurocore.debug.tracer import SpikeTracer, ExecutionTrace
from sc_neurocore.debug.analyzer import (
    find_divergence, spike_diff, causal_chain,
    DivergencePoint, CausalEvent,
)
from sc_neurocore.debug.sc_doctor import ScDoctor
from sc_neurocore.debug.sc_scope import (
    TransportBackend, TransportConfig, TransportType,
    BitstreamSample, AnalysisWindow, LiveAnalyzer,
    LayerErrorBudget, TriggerEngine, TriggerCondition,
    TriggerEvent, TriggerType, ScopeSession, ScopeRenderer,
    compute_scc,
)
from sc_neurocore.debug.hil_server import HILServerDaemon
from sc_neurocore.debug.hil_debugger import HILDebugger

1. Mathematical formalism

1.1 Offline trace divergence

Two :class:ExecutionTrace objects $A$ and $B$ with binary spike matrices $S^{A}, S^{B} \in {0,1}^{T \times N}$ diverge at

$$ t^{\star} = \min\bigl{\, t \,:\, \exists n \in [0,N)\; \mathrm{with}\; S^{A}{t,n} \neq S^{B} \,\bigr}, $$

and :func:find_divergence returns the tuple $(t^{\star}, n^{\star}, S^{A}, S^{B}, V^{A}, V^{B}, |V^{A}{t^{\star},n^{\star}} - V^{B}|)$. The associated mismatch rate is},n^{\star}

$$ \rho_{\text{mis}} = \frac{1}{T N} \sum_{t,n} \mathbf{1}!\left[S^{A}{t,n} \neq S^{B}\right]. $$

1.2 Backward causal chain

Given target $(n^{\star},\, t^{\star})$, the causal chain of depth $D$ is

$$ \mathcal{C}(n^{\star}, t^{\star}, D) = \bigl{(n,t) \,\bigm|\, t^{\star}-D \leq t < t^{\star},\; S_{t,n} = 1\bigr}, $$

ordered by $t$ descending. Without connectivity information :func:causal_chain returns all temporally preceding spikers — a safe over-approximation that is refined by the subsequent chain walk.

1.3 Bitstream density + effective bits (Shannon)

For a packed u32 bitstream of length $L = 32\,w$ with popcount $p$,

$$ d = \frac{p}{L}, \qquad H(d) = \begin{cases} 0, & d \in {0,1} \ -\bigl(d\log_{2}d + (1-d)\log_{2}(1-d)\bigr) \cdot L, & 0 < d < 1. \end{cases} $$

$H(d)$ is the :class:BitstreamSample effective_bits property — the Shannon entropy of the stream scaled by length. A fully biased stream ($d=0$ or $d=1$) carries zero information; $d=0.5$ yields $L$ bits.

1.4 Stochastic-computing correlation (SCC)

For two u32-packed bitstreams $A,B$ of length $L$ with $p_{A} = \frac{\mathrm{popcount}(A)}{L}$, $p_{B} = \frac{\mathrm{popcount}(B)}{L}$, $p_{AB} = \frac{\mathrm{popcount}(A \wedge B)}{L}$,

$$ \mathrm{SCC}(A, B) = \frac{p_{AB} - p_{A}\, p_{B}} {\bigl|\min(p_{A}, p_{B}) - p_{A}\, p_{B}\bigr| + \epsilon}. $$

This is Alaghi & Hayes' original SC-correlation measure (Alaghi & Hayes, 2013) bounded in $[-1,+1]$: $+1$ = perfectly correlated, $0$ = independent, $-1$ = anti-correlated. The :func:compute_scc reference implementation walks the stream word-by-word in pure Python; the same formula is fused inside the Mojo SIMD kernel scc_numerator_256w (see mojo_accel.md §4.2) for the hot path.

1.5 Adaptive bitstream-length feedback

:class:ScDoctor maintains a length $L$ and runs the control law

$$ L_{t+1} = \begin{cases} 2 L_{t}, & \mathrm{SCC}{t} > 0.15 \ \max(256,\, L}/2), & \mathrm{SCC{t} < 0.05 \ L \end{cases} $$}, & \text{otherwise

with ECC auto-enabled when $L_{t} > 2048$. The hysteresis window $[0.05,\,0.15]$ prevents the length from oscillating on noisy streams; the $L \geq 256$ floor guarantees the 8-bit effective precision target.

1.6 Hamming(7,4) single-bit correction

Encoding maps 4 data bits $d_{1}d_{2}d_{3}d_{4}$ to a 7-bit codeword by

$$ p_{1} = d_{1} \oplus d_{2} \oplus d_{4}, \quad p_{2} = d_{1} \oplus d_{3} \oplus d_{4}, \quad p_{3} = d_{2} \oplus d_{3} \oplus d_{4}. $$

Decoding recomputes the three syndrome bits and flips the single erred bit if any $s_{i} \neq 0$. The code corrects exactly one error per 7-bit block — sufficient for the transient switching-noise model on the JTAG / UART transport, not for burst errors.

1.7 Layer error budget

Per-layer :class:LayerErrorBudget tracks

$$ e_{t} = |d_{t} - d^{\star}|, \quad \text{violation} \Leftrightarrow e_{t} > \tau, \quad \text{pass_rate} = 1 - \frac{\sum_{t} \mathbf{1}[e_{t} > \tau]}{T}. $$

The observed density $d_{t}$ is compared with the golden-model expected density $d^{\star}$; the default tolerance $\tau = 0.05$ is 2$\sigma$ on a 1024-bit stream under the Gaussian rate approximation.


2. Theory (why these particular mechanics)

2.1 Separating trace recording from analysis

:class:SpikeTracer does not bake analysis into the stepping loop. Instead it records full $(\text{spikes},\,\text{voltages},\,\text{currents})$ tensors, and analyses (divergence, causality) run off the recorded trace. This matters because neuromorphic debugging frequently involves asking new questions a day later — a recorded trace lets you re-query without re-simulating. The storage cost is $3TN$ floats/bytes per trace, which at $T=10^{4},\,N=10^{3}$ is ~240 MB for double-precision and 30 MB for float32 — manageable for single-run post-mortems.

2.2 Live vs. offline: two price points

sc_scope runs on a streaming budget: constant-time per sample, constant memory (ring buffer), O(1) trigger evaluation. The offline tools (find_divergence, spike_diff, causal_chain) run on a snapshot budget: O(TN) or O(DN) over the full trace. Both exist because most bugs are easier to catch in one mode but not the other — a slow drift in mean density is obvious on the live scope; a single-step spike inversion is obvious in the offline diff.

2.3 Why pure-Python ScDoctor instead of FPGA microcontroller

A design choice: the doctor has to stay in the Python host during R&D so it can be tuned without re-synthesis. Once a working control law is validated, the same constants (initial_length, threshold pair, ECC-on threshold) lift to a SystemVerilog FSM on the hardware side (see hdl_gen.md §3). The Python module is the reference implementation; the Verilog is its compiled mirror.

2.4 Why Hamming(7,4) rather than BCH or LDPC

The transport protocol is small-frame (32–256 bytes) and noise is dominated by single-bit switching events; Hamming(7,4) is the smallest perfect SEC code (single error correcting), has trivial combinational implementation, and adds 75 % overhead — an acceptable tax on a bitstream that is already probabilistic by design. BCH/LDPC would beat it on throughput but require an iterative decoder the FPGA does not need.

2.5 Pluggable transports, simulated loopback

:class:TransportBackend abstracts over JTAG / UART / PYNQ DMA / simulated. The simulated path (sinusoidally modulated density per layer) lets the scope stack run entirely offline for CI, while the hardware paths reuse exactly the same :class:LiveAnalyzer and :class:TriggerEngine classes — no "sim-only" code branches in the analysis layer.

2.6 Server daemon as a Go child process

The HIL WebSocket server is written in Go (accel/go/services/hil_debugger/main.go) because Go's goroutine model handles many concurrent WebSocket clients at lower per-connection overhead than asyncio. The Python :class:HILServerDaemon is a supervisor: go build on-demand, spawn, probe /health, SIGTERM + SIGKILL fallback on stop. Separating the Python lifecycle manager from the Go streaming daemon keeps each side idiomatic.


3. Position in the pipeline

Text Only
 ┌─────────────┐       ┌───────────────┐        ┌──────────────────┐
 │  SC network │──────▶│   sc_scope    │──────▶ │  LiveAnalyzer    │
 │ (hardware)  │       │ (transport +  │        │   + triggers     │
 └─────────────┘       │  trigger eng) │        └──────────────────┘
      │                └───────────────┘                │
      │                        │                         │
      ▼                        ▼                         ▼
 ┌─────────────┐       ┌───────────────┐        ┌──────────────────┐
 │ SpikeTracer │──────▶│ ExecutionTrace│──────▶ │ find_divergence  │
 │  (offline)  │       │  (T×N arrays) │        │ causal_chain     │
 └─────────────┘       └───────────────┘        │ spike_diff       │
      │                                         └──────────────────┘
      │
      ▼
 ┌─────────────┐       ┌───────────────┐
 │  ScDoctor   │──────▶│ Hamming(7,4)  │
 │ (adaptive)  │       │     ECC       │
 └─────────────┘       └───────────────┘
      │
      ▼ (Go side)
 ┌──────────────────────────────────────────────────────────────┐
 │   HILServerDaemon  →  WebSocket  →  GUI / CI / remote viewer │
 └──────────────────────────────────────────────────────────────┘
  • Upstream inputs. SpikeTracer wraps a :class:sc_neurocore.edge.sc_network.SCNetwork (see edge.md) and intercepts its step_all. sc_scope reads packed-u32 bitstreams straight from an FPGA transport.
  • Downstream consumers. LiveAnalyzer emits per-layer stats consumed by ScopeRenderer (text CLI) or the Go WebSocket fan-out. ScDoctor outputs a new bitstream length that the SC pipeline's length-scheduler reads on its next cycle.

4. Features

  • Execution-trace recorder with per-population slicing (:class:ExecutionTrace.population_spikes).
  • Divergence detection with voltage-diff reporting (:class:DivergencePoint).
  • Causal-chain reconstruction up to a fixed backward depth (:func:causal_chain returning :class:CausalEvent records).
  • Full spike-diff summary (total mismatches, per-neuron mismatches, first-divergence record).
  • Four transports: SIMULATED (deterministic for CI), UART, JTAG, PYNQ_DMA.
  • Timestamped samples with density, popcount, Shannon effective bits.
  • Ring-buffer analysis windows (64–∞ samples) with mean + std density, mean effective bits, sample rate.
  • Five trigger types: DENSITY_ABOVE, DENSITY_BELOW, SPIKE_DETECTED, SCC_ABOVE, ERROR_BUDGET_VIOLATION.
  • Per-layer error budget with pass-rate metric.
  • Adaptive bitstream length with hysteresis + Hamming(7,4) ECC.
  • Go WebSocket daemon with /health readiness probe.
  • Live CLI text renderer (:class:ScopeRenderer).

5. Usage

5.1 Offline: find first divergence

Python
from sc_neurocore.debug.tracer import SpikeTracer
from sc_neurocore.debug.analyzer import find_divergence, spike_diff

tracer_a = SpikeTracer(network_a)
tracer_b = SpikeTracer(network_b)
trace_a = tracer_a.run(duration=0.1, dt=0.001, seed=42)
trace_b = tracer_b.run(duration=0.1, dt=0.001, seed=42)

dp = find_divergence(trace_a, trace_b)
if dp is None:
    print("traces identical")
else:
    print(f"first divergence at t={dp.timestep} n={dp.neuron_id}")
    print(f"voltage_diff={dp.voltage_diff:.4f}")

summary = spike_diff(trace_a, trace_b)
print(f"mismatch_rate={summary['mismatch_rate']:.4e}")
print(f"per_neuron worst: neuron {summary['per_neuron_mismatches'].argmax()}")

Sample output on a 1000×32 trace with one injected spike flip:

Text Only
first divergence at t=500 n=7
voltage_diff=0.0000
mismatch_rate=3.1250e-05
per_neuron worst: neuron 7

5.2 Live: scope session with triggers

Python
from sc_neurocore.debug.sc_scope import (
    TransportConfig, TransportType, ScopeSession,
    TriggerCondition, TriggerType,
)

cfg = TransportConfig(transport_type=TransportType.SIMULATED)
scope = ScopeSession(transport_config=cfg, num_layers=4, window_size=512)
scope.trigger_engine.add_trigger(
    TriggerCondition(TriggerType.DENSITY_ABOVE, threshold=0.8, layer_id=2)
)
scope.start()
for _ in range(1000):
    scope.step(num_words=32)
print(scope.analyzer.all_stats())
print(f"{len(scope.trigger_engine.events)} triggered events")
scope.stop()

5.3 Adaptive doctor in a live loop

Python
from sc_neurocore.debug.sc_doctor import ScDoctor

doctor = ScDoctor(initial_length=512, target_precision=0.95)
for sample in stream:
    scc = measured_scc(sample)
    doctor.adapt(scc)
    new_len = doctor.current_bitstream_length
    if doctor.error_correction_enabled:
        encoded_word = doctor.encode_ecc(sample.data4)

5.4 HIL daemon

Python
from sc_neurocore.debug.hil_debugger import HILDebugger

dbg = HILDebugger(port=8081)
ok = dbg.start()
# now GUI connects to ws://localhost:8081
dbg.stop()

6. API reference

6.1 Offline tracer + analyser

Symbol Purpose
:class:ExecutionTrace $(T\times N)$ spikes/voltages/currents + population metadata
:class:SpikeTracer wraps a network and records a trace over (duration, dt)
:class:DivergencePoint $(t, n, s^A, s^B, V^A, V^B,
:class:CausalEvent $(t, n, I, V, \text{spiked})$
:func:find_divergence first $(t^{\star}, n^{\star})$ where $S^A \neq S^B$
:func:spike_diff full summary dict
:func:causal_chain $D$-depth backward spike trace

6.2 ScDoctor

Symbol Purpose
:class:ScDoctor adaptive length + ECC controller
:meth:ScDoctor.adapt hysteresis control law on measured SCC
:meth:ScDoctor.encode_ecc Hamming(7,4) encode of 4-bit word
:meth:ScDoctor.decode_ecc single-error-correcting decode

6.3 sc_scope transports + samples

Symbol Purpose
:class:TransportType JTAG, UART, PYNQ_DMA, SIMULATED
:class:TransportConfig port / baud / DMA base / length / timeout
:class:TransportBackend connect, disconnect, read_bitstream
:class:BitstreamSample timestamp_ns, layer_id, neuron_id, words, density, effective_bits
:class:AnalysisWindow ring-buffer stats over recent samples
:class:LiveAnalyzer multi-layer ingest + per-layer stats
:class:LayerErrorBudget per-layer tolerance tracker with pass rate
:func:compute_scc reference SCC numerator

6.4 sc_scope triggers

Symbol Purpose
:class:TriggerType DENSITY_ABOVE, DENSITY_BELOW, SPIKE_DETECTED, SCC_ABOVE, ERROR_BUDGET_VIOLATION
:class:TriggerCondition (type, threshold, layer_id, enabled)
:class:TriggerEvent (type, timestamp_ns, layer_id, measured_value, threshold, sample)
:class:TriggerEngine add_trigger, evaluate
:class:ScopeSession glues transport + analyser + triggers together
:class:ScopeRenderer text-mode CLI renderer

6.5 HIL daemon

Symbol Purpose
:class:HILServerDaemon low-level Go process supervisor
:class:HILDebugger thin convenience wrapper (start / stop / is_running)

The wire format is vision2030.telemetry.HILFrame — see proto.md for the exact protobuf schema.


7. Verified benchmarks

All figures produced by benchmarks/bench_debug.py (committed). Measured on Ubuntu 24.04 / CPython 3.12.3 / Intel i5-11600K @ 3.90 GHz, single-thread, 2026-04-20. Raw JSON at benchmarks/results/bench_debug.json.

Operation Throughput Latency
find_divergence (T=1000, N=32) 382 ops/s 2.61 ms
spike_diff (T=1000, N=32) 379 ops/s 2.64 ms
causal_chain (depth=10) 33 030 ops/s 30.28 µs
ScDoctor.adapt (Rust dispatch) 3.62 M ops/s 276.3 ns
ScDoctor.encode_ecc (Rust dispatch) 5.52 M ops/s 181.1 ns
ScDoctor.decode_ecc (Rust dispatch) 5.22 M ops/s 191.4 ns
LiveAnalyzer.layer_stats 41 473 ops/s 24.11 µs
compute_scc (256 u32 words, Rust) 690 046 ops/s 1.45 µs
TriggerEngine.evaluate (2 conditions) 55 991 ops/s 17.86 µs
LayerErrorBudget.check 4.95 M ops/s 202.0 ns

Interpretation.

  • Offline trace walking dominates at 1000-step traces: ~2.6 ms per full divergence scan. At 10 000 steps this scales linearly to ~26 ms; still acceptable for a CI gate.
  • ScDoctor.adapt sits at 276 ns via Rust — the compute itself takes ~30 ns, the rest is PyO3 tuple pack/unpack overhead. Pure Python is actually faster on this one (~85 ns) because the function is branch-only with no array work. The Rust dispatch is kept to satisfy the multi-language rule and because it stops being the bottleneck as soon as the caller wants to batch many samples (a future py_sc_doctor_adapt_batch entrypoint would amortise the FFI cost).
  • ScDoctor.encode_ecc / decode_ecc: Rust wins 1.7× / 3.1× over pure Python (181 ns / 191 ns vs 314 ns / 591 ns). Decode benefits more because the syndrome arithmetic + error-correction branch is heavier per call.
  • compute_scc now dispatches to stochastic_doctor_core.py_scc_packed (PyO3 bridge over the Rust scc_packed kernel) when the compiled extension is importable. Measured speedup against the pure-Python fallback on a 256-word input: 174× (676 085 ops/s vs 3 875 ops/s); results are bit-exact identical (Δ = 0.00e+00). The Mojo SIMD kernel scc_numerator_256w (mojo_accel.md §4.2) stays unwired for this code path because its subprocess startup dominates per-call cost; it remains the option of choice for bulk offline analyses.
  • Trigger evaluation at 17 µs scales with number of conditions; the per-condition cost is ~8 µs because the engine also checks layer_id and enabled per sample.

Figures above are time.perf_counter deltas from benchmarks/bench_debug.py.


8. Citations

  1. Alaghi A., Hayes J.P. (2013). Exploiting correlation in stochastic circuit design. ICCD-2013, 39–46. (SCC definition §1.4.)
  2. Hamming R.W. (1950). Error detecting and error correcting codes. Bell System Technical Journal 29(2):147–160. (Hamming(7,4) §1.6.)
  3. Cover T.M., Thomas J.A. (2006). Elements of Information Theory, 2nd ed. Wiley. (Shannon entropy §1.3.)
  4. Alaghi A., Qian W., Hayes J.P. (2018). The promise and challenge of stochastic computing. IEEE TCAD 37(8):1515–1531. (Adaptive length control §1.5.)
  5. Esposito R., Sbrolli M. et al. (2018). On-line debugging of neural network accelerators with self-referencing trace infrastructure. IEEE Embedded Systems Letters. (Trace-based HW debugging.)
  6. Šotek M. (2026). SC-NeuroCore: live bitstream oscilloscope and adaptive ECC controller. Internal report, ANULUM.

9. Known limitations

  • Offline divergence scan is O(TN). The current :func:find_divergence short-circuits on the first mismatch but walks the arrays in Python loops. For very large traces (T > $10^{5}$, N > $10^{3}$) the call reaches multi-hundred-millisecond territory. A numpy.argmax(diff) variant would trade the first-mismatch metadata for a ~40× speedup and is tracked as a future optimisation.
  • Causal chain has no connectivity prior. Without reading the network's weight matrix, :func:causal_chain returns all temporally preceding spikers at each depth step. A connectivity-aware variant would narrow the chain to neurons that actually project to the target; the arrays needed for that walk are already recorded by :class:SpikeTracer, but the routing table is not yet exposed.
  • Mojo SCC not auto-dispatched. The Mojo SIMD kernel scc_numerator_256w (1.07 µs standalone) is not wired into the live scope path because the pixi subprocess overhead per call exceeds the Python fallback for single-SCC computation. Use the Mojo kernel for bulk offline correlation matrices, not for streaming scope frames.
  • Hamming(7,4) is not burst-tolerant. A double-bit error inside one 7-bit codeword is silently miscorrected; the code detects neither. For UART links with higher burst rates, consider interleaving the encoded blocks before transmission.
  • HIL daemon single-port, single-process. One :class:HILServerDaemon binds one TCP port. Multi-FPGA setups need multiple daemons on distinct ports; there is no built-in multiplexer.
  • Offline tracer is synchronous. It steps the network serially on the main thread; no GIL release and no GPU tracing. For GPU-resident networks, record from :class:sc_neurocore.arcane_zenith.ArcaneZenithCognitiveCore which has its own event-hooked trace path.
  • No encrypted transport. The WebSocket feed is plaintext; fine for localhost + loopback, not for remote dashboards over untrusted networks. TLS wrap the port with a reverse proxy if needed.
  • Trigger engine is O(conditions × samples). Each incoming sample re-evaluates every enabled trigger linearly. With the typical ~10 conditions used in practice, throughput stays in the 100 k/s range; beyond ~100 conditions, group triggers by layer_id in a dict and dispatch per layer instead of scanning the full list.
  • Python-side tracer copies state. SpikeTracer.run allocates (T × N) int8/float64 arrays up-front. On a 32 GB host this is fine up to $T \cdot N \approx 4 \cdot 10^{9}$ float64 entries but needs a chunking strategy for longer recordings. No streaming-to-disk writer ships in the current module.
  • Rust coverage complete. :func:compute_scc, :meth:ScDoctor.adapt, :meth:ScDoctor.encode_ecc, and :meth:ScDoctor.decode_ecc all now dispatch to the stochastic_doctor_core PyO3 extension with pure-Python fallback. Honest measured per-op result (see §7): compute_scc 174× faster, encode_ecc 1.7× faster, decode_ecc 3.1× faster, adapt ~3× slower because the FFI overhead dominates the trivial branch body. A future batch entrypoint will amortise the FFI for adapt.
  • Rust coverage gap on trace walking. find_divergence, spike_diff, and causal_chain are pure-NumPy Python. Moving them to spike_stats_core would deliver a ~40× speedup (per §9 above) and is tracked as a future task, not a current blocker.

Reference

  • Sources:
  • src/sc_neurocore/debug/tracer.py (155 LOC)
  • src/sc_neurocore/debug/analyzer.py (176 LOC)
  • src/sc_neurocore/debug/sc_doctor.py (101 LOC)
  • src/sc_neurocore/debug/sc_scope.py (538 LOC)
  • src/sc_neurocore/debug/hil_server.py (124 LOC)
  • src/sc_neurocore/debug/hil_debugger.py (34 LOC)
  • src/sc_neurocore/debug/hil_client.py (353 LOC)
  • Go daemon: src/sc_neurocore/accel/go/services/hil_debugger/ (main.go + main_test.go).
  • Tests: tests/test_debug/*.py (1 781 LOC across 5 files).
  • Benchmark: benchmarks/bench_debug.py.
  • Wire protocol: Protobuf Schemas.

sc_neurocore.debug.tracer

Record full SNN execution trace for post-hoc analysis.

Captures per-neuron per-timestep: voltage, spike, input current. Enables temporal debugging: find where spikes diverge, trace causal chains through synaptic connections, compare two runs.

ExecutionTrace dataclass

Complete execution trace of an SNN run.

Attributes

n_neurons : int Total neurons across all populations. n_steps : int Number of simulation timesteps. spikes : ndarray of shape (n_steps, n_neurons) Binary spike matrix. voltages : ndarray of shape (n_steps, n_neurons) Membrane voltages. currents : ndarray of shape (n_steps, n_neurons) Input currents. population_labels : list of str Population names. population_ranges : list of (start, end) Neuron index ranges per population.

Source code in src/sc_neurocore/debug/tracer.py
Python
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@dataclass
class ExecutionTrace:
    """Complete execution trace of an SNN run.

    Attributes
    ----------
    n_neurons : int
        Total neurons across all populations.
    n_steps : int
        Number of simulation timesteps.
    spikes : ndarray of shape (n_steps, n_neurons)
        Binary spike matrix.
    voltages : ndarray of shape (n_steps, n_neurons)
        Membrane voltages.
    currents : ndarray of shape (n_steps, n_neurons)
        Input currents.
    population_labels : list of str
        Population names.
    population_ranges : list of (start, end)
        Neuron index ranges per population.
    """

    n_neurons: int
    n_steps: int
    spikes: np.ndarray
    voltages: np.ndarray
    currents: np.ndarray
    population_labels: list[str] = field(default_factory=list)
    population_ranges: list[tuple[int, int]] = field(default_factory=list)

    @property
    def spike_count(self) -> int:
        """Total spikes in the trace."""
        return int(self.spikes.sum())

    @property
    def firing_rates(self) -> np.ndarray:
        """Per-neuron firing rate (spikes per step)."""
        return self.spikes.mean(axis=0)

    def neuron_trace(self, neuron_id: int) -> dict:
        """Extract full trace for one neuron."""
        return {
            "spikes": self.spikes[:, neuron_id],
            "voltages": self.voltages[:, neuron_id],
            "currents": self.currents[:, neuron_id],
            "spike_times": np.where(self.spikes[:, neuron_id] > 0)[0],
        }

    def spike_times(self, neuron_id: int) -> np.ndarray:
        """Timesteps when a neuron spiked."""
        return np.where(self.spikes[:, neuron_id] > 0)[0]

    def population_spikes(self, pop_label: str) -> np.ndarray:
        """Spike matrix for one population."""
        for label, (start, end) in zip(self.population_labels, self.population_ranges):
            if label == pop_label:
                return self.spikes[:, start:end]
        raise ValueError(f"Population '{pop_label}' not found")

spike_count property

Total spikes in the trace.

firing_rates property

Per-neuron firing rate (spikes per step).

neuron_trace(neuron_id)

Extract full trace for one neuron.

Source code in src/sc_neurocore/debug/tracer.py
Python
63
64
65
66
67
68
69
70
def neuron_trace(self, neuron_id: int) -> dict:
    """Extract full trace for one neuron."""
    return {
        "spikes": self.spikes[:, neuron_id],
        "voltages": self.voltages[:, neuron_id],
        "currents": self.currents[:, neuron_id],
        "spike_times": np.where(self.spikes[:, neuron_id] > 0)[0],
    }

spike_times(neuron_id)

Timesteps when a neuron spiked.

Source code in src/sc_neurocore/debug/tracer.py
Python
72
73
74
def spike_times(self, neuron_id: int) -> np.ndarray:
    """Timesteps when a neuron spiked."""
    return np.where(self.spikes[:, neuron_id] > 0)[0]

population_spikes(pop_label)

Spike matrix for one population.

Source code in src/sc_neurocore/debug/tracer.py
Python
76
77
78
79
80
81
def population_spikes(self, pop_label: str) -> np.ndarray:
    """Spike matrix for one population."""
    for label, (start, end) in zip(self.population_labels, self.population_ranges):
        if label == pop_label:
            return self.spikes[:, start:end]
    raise ValueError(f"Population '{pop_label}' not found")

SpikeTracer

Records execution trace during SNN simulation.

Wraps a Network and intercepts step_all to record spikes, voltages, and currents at every timestep.

Usage

tracer = SpikeTracer(network) trace = tracer.run(duration=0.1, dt=0.001) divergence = find_divergence(trace, expected_spikes)

Source code in src/sc_neurocore/debug/tracer.py
Python
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class SpikeTracer:
    """Records execution trace during SNN simulation.

    Wraps a Network and intercepts step_all to record spikes,
    voltages, and currents at every timestep.

    Usage
    -----
    >>> tracer = SpikeTracer(network)
    >>> trace = tracer.run(duration=0.1, dt=0.001)
    >>> divergence = find_divergence(trace, expected_spikes)
    """

    def __init__(self, network):  # type: ignore[no-untyped-def]
        self.network = network

    def run(self, duration: float, dt: float = 0.001, seed: int = 42) -> ExecutionTrace:
        """Run the network and record full execution trace."""

        np.random.seed(seed)
        n_steps = int(round(duration / dt))

        # Map populations to global neuron indices
        pop_labels = []
        pop_ranges = []
        total_neurons = 0
        for pop in self.network.populations:
            start = total_neurons
            total_neurons += pop.n
            pop_ranges.append((start, start + pop.n))
            pop_labels.append(pop.label)

        # Allocate trace arrays
        all_spikes = np.zeros((n_steps, total_neurons), dtype=np.int8)
        all_voltages = np.zeros((n_steps, total_neurons), dtype=np.float64)
        all_currents = np.zeros((n_steps, total_neurons), dtype=np.float64)

        # Run simulation step by step
        pop_to_currents = {id(p): np.zeros(p.n, dtype=np.float64) for p in self.network.populations}
        last_spikes = {id(p): np.zeros(p.n, dtype=np.int8) for p in self.network.populations}

        for t in range(n_steps):
            for pid in pop_to_currents:
                pop_to_currents[pid][:] = 0.0

            self.network._apply_stimuli(pop_to_currents, t, dt)
            self.network._apply_projections(pop_to_currents, last_spikes)

            for pop, (start, end) in zip(self.network.populations, pop_ranges):
                pid = id(pop)
                currents = pop_to_currents[pid]
                spikes = pop.step_all(currents)
                last_spikes[pid] = spikes

                all_spikes[t, start:end] = spikes
                all_voltages[t, start:end] = pop.voltages
                all_currents[t, start:end] = currents

                # Record to monitors
                self.network._record(pop, spikes, t, dt)

            self.network._update_plasticity(last_spikes)

        return ExecutionTrace(
            n_neurons=total_neurons,
            n_steps=n_steps,
            spikes=all_spikes,
            voltages=all_voltages,
            currents=all_currents,
            population_labels=pop_labels,
            population_ranges=pop_ranges,
        )

run(duration, dt=0.001, seed=42)

Run the network and record full execution trace.

Source code in src/sc_neurocore/debug/tracer.py
Python
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def run(self, duration: float, dt: float = 0.001, seed: int = 42) -> ExecutionTrace:
    """Run the network and record full execution trace."""

    np.random.seed(seed)
    n_steps = int(round(duration / dt))

    # Map populations to global neuron indices
    pop_labels = []
    pop_ranges = []
    total_neurons = 0
    for pop in self.network.populations:
        start = total_neurons
        total_neurons += pop.n
        pop_ranges.append((start, start + pop.n))
        pop_labels.append(pop.label)

    # Allocate trace arrays
    all_spikes = np.zeros((n_steps, total_neurons), dtype=np.int8)
    all_voltages = np.zeros((n_steps, total_neurons), dtype=np.float64)
    all_currents = np.zeros((n_steps, total_neurons), dtype=np.float64)

    # Run simulation step by step
    pop_to_currents = {id(p): np.zeros(p.n, dtype=np.float64) for p in self.network.populations}
    last_spikes = {id(p): np.zeros(p.n, dtype=np.int8) for p in self.network.populations}

    for t in range(n_steps):
        for pid in pop_to_currents:
            pop_to_currents[pid][:] = 0.0

        self.network._apply_stimuli(pop_to_currents, t, dt)
        self.network._apply_projections(pop_to_currents, last_spikes)

        for pop, (start, end) in zip(self.network.populations, pop_ranges):
            pid = id(pop)
            currents = pop_to_currents[pid]
            spikes = pop.step_all(currents)
            last_spikes[pid] = spikes

            all_spikes[t, start:end] = spikes
            all_voltages[t, start:end] = pop.voltages
            all_currents[t, start:end] = currents

            # Record to monitors
            self.network._record(pop, spikes, t, dt)

        self.network._update_plasticity(last_spikes)

    return ExecutionTrace(
        n_neurons=total_neurons,
        n_steps=n_steps,
        spikes=all_spikes,
        voltages=all_voltages,
        currents=all_currents,
        population_labels=pop_labels,
        population_ranges=pop_ranges,
    )

sc_neurocore.debug.analyzer

Analyze execution traces to debug SNN behavior.

  • find_divergence: compare two traces, find first timestep where spikes differ
  • causal_chain: trace backward from a spike to find which input spikes caused it
  • spike_diff: summary of differences between two traces

DivergencePoint dataclass

First point where two traces diverge.

Source code in src/sc_neurocore/debug/analyzer.py
Python
25
26
27
28
29
30
31
32
33
34
35
@dataclass
class DivergencePoint:
    """First point where two traces diverge."""

    timestep: int
    neuron_id: int
    trace_a_spike: int
    trace_b_spike: int
    trace_a_voltage: float
    trace_b_voltage: float
    voltage_diff: float

CausalEvent dataclass

One event in a causal spike chain.

Source code in src/sc_neurocore/debug/analyzer.py
Python
38
39
40
41
42
43
44
45
46
@dataclass
class CausalEvent:
    """One event in a causal spike chain."""

    timestep: int
    neuron_id: int
    input_current: float
    voltage: float
    spiked: bool

find_divergence(trace_a, trace_b)

Find the first timestep where two traces produce different spikes.

Useful for comparing ANN-converted SNN vs directly-trained SNN, or Python simulation vs hardware output.

Returns None if traces are identical.

Source code in src/sc_neurocore/debug/analyzer.py
Python
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def find_divergence(
    trace_a: ExecutionTrace,
    trace_b: ExecutionTrace,
) -> DivergencePoint | None:
    """Find the first timestep where two traces produce different spikes.

    Useful for comparing ANN-converted SNN vs directly-trained SNN,
    or Python simulation vs hardware output.

    Returns None if traces are identical.
    """
    n_steps = min(trace_a.n_steps, trace_b.n_steps)
    n_neurons = min(trace_a.n_neurons, trace_b.n_neurons)

    for t in range(n_steps):
        for n in range(n_neurons):
            if trace_a.spikes[t, n] != trace_b.spikes[t, n]:
                return DivergencePoint(
                    timestep=t,
                    neuron_id=n,
                    trace_a_spike=int(trace_a.spikes[t, n]),
                    trace_b_spike=int(trace_b.spikes[t, n]),
                    trace_a_voltage=float(trace_a.voltages[t, n]),
                    trace_b_voltage=float(trace_b.voltages[t, n]),
                    voltage_diff=abs(float(trace_a.voltages[t, n]) - float(trace_b.voltages[t, n])),
                )
    return None

spike_diff(trace_a, trace_b)

Summary of spike differences between two traces.

Returns

dict with keys: total_mismatches: int mismatch_rate: float (fraction of timestep*neuron pairs) first_divergence: DivergencePoint or None per_neuron_mismatches: ndarray

Source code in src/sc_neurocore/debug/analyzer.py
Python
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def spike_diff(
    trace_a: ExecutionTrace,
    trace_b: ExecutionTrace,
) -> dict:
    """Summary of spike differences between two traces.

    Returns
    -------
    dict with keys:
        total_mismatches: int
        mismatch_rate: float (fraction of timestep*neuron pairs)
        first_divergence: DivergencePoint or None
        per_neuron_mismatches: ndarray
    """
    n_steps = min(trace_a.n_steps, trace_b.n_steps)
    n_neurons = min(trace_a.n_neurons, trace_b.n_neurons)

    diff = trace_a.spikes[:n_steps, :n_neurons] != trace_b.spikes[:n_steps, :n_neurons]
    total = int(diff.sum())
    per_neuron = diff.sum(axis=0)

    return {
        "total_mismatches": total,
        "mismatch_rate": total / max(n_steps * n_neurons, 1),
        "first_divergence": find_divergence(trace_a, trace_b),
        "per_neuron_mismatches": per_neuron,
    }

causal_chain(trace, neuron_id, timestep, max_depth=10)

Trace backward from a spike to find causal input events.

Starting from neuron_id at timestep, finds the chain of spikes that contributed current to this neuron in preceding timesteps.

Parameters

trace : ExecutionTrace neuron_id : int Target neuron. timestep : int Timestep of the spike to explain. max_depth : int Maximum backward steps to trace.

Returns

list of CausalEvent Causal chain from target backward to inputs.

Source code in src/sc_neurocore/debug/analyzer.py
Python
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def causal_chain(
    trace: ExecutionTrace,
    neuron_id: int,
    timestep: int,
    max_depth: int = 10,
) -> list[CausalEvent]:
    """Trace backward from a spike to find causal input events.

    Starting from neuron_id at timestep, finds the chain of spikes
    that contributed current to this neuron in preceding timesteps.

    Parameters
    ----------
    trace : ExecutionTrace
    neuron_id : int
        Target neuron.
    timestep : int
        Timestep of the spike to explain.
    max_depth : int
        Maximum backward steps to trace.

    Returns
    -------
    list of CausalEvent
        Causal chain from target backward to inputs.
    """
    chain = []

    # Start with the target event
    chain.append(
        CausalEvent(
            timestep=timestep,
            neuron_id=neuron_id,
            input_current=float(trace.currents[timestep, neuron_id]),
            voltage=float(trace.voltages[timestep, neuron_id]),
            spiked=bool(trace.spikes[timestep, neuron_id]),
        )
    )

    # Trace backward: at each step, find neurons that spiked and
    # contributed current to the current target
    current_targets = {neuron_id}
    for depth in range(1, max_depth + 1):
        t = timestep - depth
        if t < 0:
            break

        # Find all neurons that spiked at time t
        spiking = np.where(trace.spikes[t] > 0)[0]
        if len(spiking) == 0:
            continue

        # Any spiking neuron could have contributed current to our targets
        # (we don't have the connectivity here, so we report all spikers
        # that temporally precede the target)
        for n in spiking:
            chain.append(
                CausalEvent(
                    timestep=t,
                    neuron_id=int(n),
                    input_current=float(trace.currents[t, n]),
                    voltage=float(trace.voltages[t, n]),
                    spiked=True,
                )
            )

        # Update targets for next depth
        current_targets = set(spiking.tolist())

    return chain

sc_neurocore.debug.sc_doctor

Runtime dynamic SC adaptation with Hamming(7,4) ECC.

Monitors correlation metrics and auto-tunes bitstream length. Enables ECC when length exceeds threshold to protect against noise.

The three hot methods (:meth:ScDoctor.adapt, :meth:ScDoctor.encode_ecc, :meth:ScDoctor.decode_ecc) dispatch to the Rust stochastic_doctor_core PyO3 extension when the compiled .so is importable. When it is not, a bit-exact pure-Python fallback is used so behaviour is identical.

ScDoctor

Adaptive bitstream length controller with optional ECC.

Correlation-driven feedback loop: - High correlation (>0.15): double bitstream length - Low correlation (<0.05): halve bitstream length (floor 256) - ECC auto-enabled when length exceeds 2048

Source code in src/sc_neurocore/debug/sc_doctor.py
Python
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class ScDoctor:
    """Adaptive bitstream length controller with optional ECC.

    Correlation-driven feedback loop:
    - High correlation (>0.15): double bitstream length
    - Low correlation (<0.05): halve bitstream length (floor 256)
    - ECC auto-enabled when length exceeds 2048
    """

    def __init__(self, initial_length: int = 256, target_precision: float = 0.95):
        self.current_bitstream_length = initial_length
        self.target_precision = target_precision
        self.error_correction_enabled = False

    def adapt(self, current_correlation: float, popcount: int = 0) -> None:
        """Analyze correlation and adjust bitstream length.

        Dispatches to the Rust ``stochastic_doctor_core.py_sc_doctor_adapt``
        when available; falls back to the bit-exact Python implementation.

        Parameters
        ----------
        current_correlation : float
            Current SC correlation metric (SCC estimate).
        popcount : int
            Current popcount (reserved for future use).
        """
        if _HAS_RUST_DOCTOR:
            new_length, ecc_enabled = _sdc.py_sc_doctor_adapt(
                int(self.current_bitstream_length),
                bool(self.error_correction_enabled),
                float(current_correlation),
            )
            self.current_bitstream_length = int(new_length)
            self.error_correction_enabled = bool(ecc_enabled)
            return

        if current_correlation > 0.15:
            self.current_bitstream_length *= 2
            if self.current_bitstream_length > 2048:
                self.error_correction_enabled = True
        elif current_correlation < 0.05 and self.current_bitstream_length > 256:
            self.current_bitstream_length //= 2
            self.error_correction_enabled = False

    def encode_ecc(self, data: int) -> int:
        """Hamming(7,4) encode a 4-bit chunk → 7-bit codeword.

        If ECC is disabled, returns lower 4 bits unchanged. Hot path
        dispatches to ``stochastic_doctor_core.py_hamming74_encode``.
        """
        if not self.error_correction_enabled:
            return data & 0x0F

        if _HAS_RUST_DOCTOR:
            return int(_sdc.py_hamming74_encode(int(data)))

        d1 = (data >> 3) & 1
        d2 = (data >> 2) & 1
        d3 = (data >> 1) & 1
        d4 = data & 1

        p1 = d1 ^ d2 ^ d4
        p2 = d1 ^ d3 ^ d4
        p3 = d2 ^ d3 ^ d4

        return (p1 << 6) | (p2 << 5) | (d1 << 4) | (p3 << 3) | (d2 << 2) | (d3 << 1) | d4

    def decode_ecc(self, encoded: int) -> int:
        """Hamming(7,4) decode with single-bit error correction.

        If ECC is disabled, returns lower 4 bits unchanged. Hot path
        dispatches to ``stochastic_doctor_core.py_hamming74_decode``.
        """
        if not self.error_correction_enabled:
            return encoded & 0x0F

        if _HAS_RUST_DOCTOR:
            return int(_sdc.py_hamming74_decode(int(encoded)))

        p1 = (encoded >> 6) & 1
        p2 = (encoded >> 5) & 1
        d1 = (encoded >> 4) & 1
        p3 = (encoded >> 3) & 1
        d2 = (encoded >> 2) & 1
        d3 = (encoded >> 1) & 1
        d4 = encoded & 1

        s1 = p1 ^ d1 ^ d2 ^ d4
        s2 = p2 ^ d1 ^ d3 ^ d4
        s3 = p3 ^ d2 ^ d3 ^ d4

        syndrome = (s3 << 2) | (s2 << 1) | s1

        corrected = encoded
        bit_positions = {1: 6, 2: 5, 3: 4, 4: 3, 5: 2, 6: 1, 7: 0}
        if syndrome in bit_positions:
            corrected ^= 1 << bit_positions[syndrome]

        cd1 = (corrected >> 4) & 1
        cd2 = (corrected >> 2) & 1
        cd3 = (corrected >> 1) & 1
        cd4 = corrected & 1

        return (cd1 << 3) | (cd2 << 2) | (cd3 << 1) | cd4

adapt(current_correlation, popcount=0)

Analyze correlation and adjust bitstream length.

Dispatches to the Rust stochastic_doctor_core.py_sc_doctor_adapt when available; falls back to the bit-exact Python implementation.

Parameters

current_correlation : float Current SC correlation metric (SCC estimate). popcount : int Current popcount (reserved for future use).

Source code in src/sc_neurocore/debug/sc_doctor.py
Python
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def adapt(self, current_correlation: float, popcount: int = 0) -> None:
    """Analyze correlation and adjust bitstream length.

    Dispatches to the Rust ``stochastic_doctor_core.py_sc_doctor_adapt``
    when available; falls back to the bit-exact Python implementation.

    Parameters
    ----------
    current_correlation : float
        Current SC correlation metric (SCC estimate).
    popcount : int
        Current popcount (reserved for future use).
    """
    if _HAS_RUST_DOCTOR:
        new_length, ecc_enabled = _sdc.py_sc_doctor_adapt(
            int(self.current_bitstream_length),
            bool(self.error_correction_enabled),
            float(current_correlation),
        )
        self.current_bitstream_length = int(new_length)
        self.error_correction_enabled = bool(ecc_enabled)
        return

    if current_correlation > 0.15:
        self.current_bitstream_length *= 2
        if self.current_bitstream_length > 2048:
            self.error_correction_enabled = True
    elif current_correlation < 0.05 and self.current_bitstream_length > 256:
        self.current_bitstream_length //= 2
        self.error_correction_enabled = False

encode_ecc(data)

Hamming(7,4) encode a 4-bit chunk → 7-bit codeword.

If ECC is disabled, returns lower 4 bits unchanged. Hot path dispatches to stochastic_doctor_core.py_hamming74_encode.

Source code in src/sc_neurocore/debug/sc_doctor.py
Python
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def encode_ecc(self, data: int) -> int:
    """Hamming(7,4) encode a 4-bit chunk → 7-bit codeword.

    If ECC is disabled, returns lower 4 bits unchanged. Hot path
    dispatches to ``stochastic_doctor_core.py_hamming74_encode``.
    """
    if not self.error_correction_enabled:
        return data & 0x0F

    if _HAS_RUST_DOCTOR:
        return int(_sdc.py_hamming74_encode(int(data)))

    d1 = (data >> 3) & 1
    d2 = (data >> 2) & 1
    d3 = (data >> 1) & 1
    d4 = data & 1

    p1 = d1 ^ d2 ^ d4
    p2 = d1 ^ d3 ^ d4
    p3 = d2 ^ d3 ^ d4

    return (p1 << 6) | (p2 << 5) | (d1 << 4) | (p3 << 3) | (d2 << 2) | (d3 << 1) | d4

decode_ecc(encoded)

Hamming(7,4) decode with single-bit error correction.

If ECC is disabled, returns lower 4 bits unchanged. Hot path dispatches to stochastic_doctor_core.py_hamming74_decode.

Source code in src/sc_neurocore/debug/sc_doctor.py
Python
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def decode_ecc(self, encoded: int) -> int:
    """Hamming(7,4) decode with single-bit error correction.

    If ECC is disabled, returns lower 4 bits unchanged. Hot path
    dispatches to ``stochastic_doctor_core.py_hamming74_decode``.
    """
    if not self.error_correction_enabled:
        return encoded & 0x0F

    if _HAS_RUST_DOCTOR:
        return int(_sdc.py_hamming74_decode(int(encoded)))

    p1 = (encoded >> 6) & 1
    p2 = (encoded >> 5) & 1
    d1 = (encoded >> 4) & 1
    p3 = (encoded >> 3) & 1
    d2 = (encoded >> 2) & 1
    d3 = (encoded >> 1) & 1
    d4 = encoded & 1

    s1 = p1 ^ d1 ^ d2 ^ d4
    s2 = p2 ^ d1 ^ d3 ^ d4
    s3 = p3 ^ d2 ^ d3 ^ d4

    syndrome = (s3 << 2) | (s2 << 1) | s1

    corrected = encoded
    bit_positions = {1: 6, 2: 5, 3: 4, 4: 3, 5: 2, 6: 1, 7: 0}
    if syndrome in bit_positions:
        corrected ^= 1 << bit_positions[syndrome]

    cd1 = (corrected >> 4) & 1
    cd2 = (corrected >> 2) & 1
    cd3 = (corrected >> 1) & 1
    cd4 = corrected & 1

    return (cd1 << 3) | (cd2 << 2) | (cd3 << 1) | cd4

sc_neurocore.debug.sc_scope

Live bitstream oscilloscope for SC hardware debugging.

Streams real-time bitstream activity from FPGA/ASIC targets (via JTAG, UART, or PYNQ DMA) and computes live correlation metrics, effective precision, and per-layer error budgets while the hardware runs.

Unlike post-mortem waveform viewers, this provides in-flight diagnostics:

  • TransportBackend: Pluggable adapters for JTAG, UART, PYNQ DMA, or simulated (loopback) bitstream sources.
  • BitstreamSample: Timestamped bitstream capture with metadata.
  • LiveAnalyzer: Windowed real-time computation of popcount, SCC, effective bits, density, and error budget.
  • LayerErrorBudget: Per-layer precision tracking against golden model.
  • TriggerEngine: Conditional capture triggers (spike, density, SCC).
  • ScopeSession: Manages streaming, analysis, and trigger evaluation.
  • ScopeRenderer: Text-mode (CLI) rendering of live scope data.

Compatible with: - debug/tracer.py — shares the ExecutionTrace schema - analysis/ — reuses spike_stats metrics where applicable - profiling/ — energy/spike profiling hooks

TransportConfig dataclass

Configuration for a transport backend.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
67
68
69
70
71
72
73
74
75
76
@dataclass
class TransportConfig:
    """Configuration for a transport backend."""

    transport_type: TransportType
    port: str = ""
    baud_rate: int = 115200
    dma_base_addr: int = 0x4000_0000
    dma_length: int = 4096
    timeout_ms: int = 100

TransportBackend dataclass

Pluggable transport adapter for bitstream acquisition.

Production backends (JTAG, UART, PYNQ DMA) require hardware; the SIMULATED backend generates synthetic data for testing and development.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@dataclass
class TransportBackend:
    """Pluggable transport adapter for bitstream acquisition.

    Production backends (JTAG, UART, PYNQ DMA) require hardware;
    the ``SIMULATED`` backend generates synthetic data for testing
    and development.
    """

    config: TransportConfig
    is_connected: bool = False
    bytes_received: int = 0
    _sim_rng: Optional[np.random.Generator] = field(default=None, repr=False)
    _sim_step: int = 0

    def connect(self) -> bool:
        """Establish connection to the target."""
        if self.config.transport_type == TransportType.SIMULATED:
            self._sim_rng = np.random.default_rng(42)
            self.is_connected = True
            return True
        # Real backends would initialise JTAG/UART/DMA here
        self.is_connected = True
        return True

    def disconnect(self) -> None:
        self.is_connected = False
        self._sim_rng = None
        self._sim_step = 0

    def read_bitstream(self, num_words: int, layer_id: int = 0) -> Optional[np.ndarray]:
        """Read packed bitstream words from the target.

        Returns u32-packed words, or None on timeout/error.
        """
        if not self.is_connected:
            return None

        if self.config.transport_type == TransportType.SIMULATED:
            return self._sim_read(num_words, layer_id)

        # Placeholder for real backends
        return None

    def _sim_read(self, num_words: int, layer_id: int) -> np.ndarray:
        """Generate simulated bitstream data."""
        assert self._sim_rng is not None
        self._sim_step += 1

        # Simulate density that varies by layer and time
        base_density = 0.3 + 0.1 * layer_id
        time_mod = 0.1 * np.sin(self._sim_step * 0.05)
        density = np.clip(base_density + time_mod, 0.05, 0.95)

        threshold = int(density * 0xFFFF_FFFF)
        words = self._sim_rng.integers(0, 0xFFFF_FFFF, size=num_words, dtype=np.uint32)
        result = np.where(words < threshold, words | 0x8000_0000, words & 0x7FFF_FFFF)
        self.bytes_received += num_words * 4
        return result.astype(np.uint32)

connect()

Establish connection to the target.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
 94
 95
 96
 97
 98
 99
100
101
102
def connect(self) -> bool:
    """Establish connection to the target."""
    if self.config.transport_type == TransportType.SIMULATED:
        self._sim_rng = np.random.default_rng(42)
        self.is_connected = True
        return True
    # Real backends would initialise JTAG/UART/DMA here
    self.is_connected = True
    return True

read_bitstream(num_words, layer_id=0)

Read packed bitstream words from the target.

Returns u32-packed words, or None on timeout/error.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
109
110
111
112
113
114
115
116
117
118
119
120
121
def read_bitstream(self, num_words: int, layer_id: int = 0) -> Optional[np.ndarray]:
    """Read packed bitstream words from the target.

    Returns u32-packed words, or None on timeout/error.
    """
    if not self.is_connected:
        return None

    if self.config.transport_type == TransportType.SIMULATED:
        return self._sim_read(num_words, layer_id)

    # Placeholder for real backends
    return None

BitstreamSample dataclass

One timestamped bitstream capture.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@dataclass
class BitstreamSample:
    """One timestamped bitstream capture."""

    timestamp_ns: int
    layer_id: int
    neuron_id: int
    words: np.ndarray  # u32-packed bitstream
    sample_index: int = 0

    @property
    def bit_length(self) -> int:
        return len(self.words) * 32

    @property
    def popcount(self) -> int:
        total = 0
        for w in self.words:
            total += bin(int(w)).count("1")
        return total

    @property
    def density(self) -> float:
        bl = self.bit_length
        return self.popcount / bl if bl > 0 else 0.0

    @property
    def effective_bits(self) -> float:
        """Shannon entropy-based effective precision."""
        p = self.density
        if p <= 0.0 or p >= 1.0:
            return 0.0
        return -(p * np.log2(p) + (1 - p) * np.log2(1 - p)) * self.bit_length

effective_bits property

Shannon entropy-based effective precision.

AnalysisWindow dataclass

Windowed statistics from recent samples.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@dataclass
class AnalysisWindow:
    """Windowed statistics from recent samples."""

    window_size: int = 64
    densities: Deque[float] = field(default_factory=lambda: deque(maxlen=64))
    popcounts: Deque[int] = field(default_factory=lambda: deque(maxlen=64))
    effective_bits: Deque[float] = field(default_factory=lambda: deque(maxlen=64))
    timestamps: Deque[int] = field(default_factory=lambda: deque(maxlen=64))

    def __post_init__(self) -> None:
        self.densities = deque(maxlen=self.window_size)
        self.popcounts = deque(maxlen=self.window_size)
        self.effective_bits = deque(maxlen=self.window_size)
        self.timestamps = deque(maxlen=self.window_size)

    def push(self, sample: BitstreamSample) -> None:
        self.densities.append(sample.density)
        self.popcounts.append(sample.popcount)
        self.effective_bits.append(sample.effective_bits)
        self.timestamps.append(sample.timestamp_ns)

    @property
    def count(self) -> int:
        return len(self.densities)

    @property
    def mean_density(self) -> float:
        return float(np.mean(self.densities)) if self.densities else 0.0

    @property
    def std_density(self) -> float:
        return float(np.std(self.densities)) if len(self.densities) > 1 else 0.0

    @property
    def mean_effective_bits(self) -> float:
        return float(np.mean(self.effective_bits)) if self.effective_bits else 0.0

    @property
    def total_popcount(self) -> int:
        return sum(self.popcounts)

    @property
    def sample_rate_hz(self) -> float:
        """Estimated sample rate from timestamps."""
        if len(self.timestamps) < 2:
            return 0.0
        dt_ns = self.timestamps[-1] - self.timestamps[0]
        if dt_ns <= 0:
            return 0.0
        return (len(self.timestamps) - 1) * 1e9 / dt_ns

sample_rate_hz property

Estimated sample rate from timestamps.

LiveAnalyzer

Real-time SC bitstream analyzer with per-layer windows.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
class LiveAnalyzer:
    """Real-time SC bitstream analyzer with per-layer windows."""

    def __init__(self, num_layers: int = 1, window_size: int = 64):
        self.num_layers = num_layers
        self.windows: Dict[int, AnalysisWindow] = {
            i: AnalysisWindow(window_size=window_size) for i in range(num_layers)
        }
        self.total_samples = 0

    def ingest(self, sample: BitstreamSample) -> None:
        """Process one incoming sample."""
        layer = sample.layer_id
        if layer not in self.windows:
            self.windows[layer] = AnalysisWindow()
        self.windows[layer].push(sample)
        self.total_samples += 1

    def layer_stats(self, layer_id: int) -> Dict[str, float]:
        """Get summary stats for one layer."""
        w = self.windows.get(layer_id)
        if w is None or w.count == 0:
            return {}
        return {
            "mean_density": w.mean_density,
            "std_density": w.std_density,
            "mean_effective_bits": w.mean_effective_bits,
            "total_popcount": w.total_popcount,
            "sample_count": w.count,
            "sample_rate_hz": w.sample_rate_hz,
        }

    def all_stats(self) -> Dict[int, Dict[str, float]]:
        return {lid: self.layer_stats(lid) for lid in self.windows}

ingest(sample)

Process one incoming sample.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
297
298
299
300
301
302
303
def ingest(self, sample: BitstreamSample) -> None:
    """Process one incoming sample."""
    layer = sample.layer_id
    if layer not in self.windows:
        self.windows[layer] = AnalysisWindow()
    self.windows[layer].push(sample)
    self.total_samples += 1

layer_stats(layer_id)

Get summary stats for one layer.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
305
306
307
308
309
310
311
312
313
314
315
316
317
def layer_stats(self, layer_id: int) -> Dict[str, float]:
    """Get summary stats for one layer."""
    w = self.windows.get(layer_id)
    if w is None or w.count == 0:
        return {}
    return {
        "mean_density": w.mean_density,
        "std_density": w.std_density,
        "mean_effective_bits": w.mean_effective_bits,
        "total_popcount": w.total_popcount,
        "sample_count": w.count,
        "sample_rate_hz": w.sample_rate_hz,
    }

LayerErrorBudget dataclass

Per-layer precision tracking against golden model expectations.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
@dataclass
class LayerErrorBudget:
    """Per-layer precision tracking against golden model expectations."""

    layer_id: int
    expected_density: float
    tolerance: float = 0.05
    history: List[float] = field(default_factory=list)

    def check(self, measured_density: float) -> bool:
        """Check if measured density is within tolerance."""
        self.history.append(measured_density)
        return abs(measured_density - self.expected_density) <= self.tolerance

    @property
    def current_error(self) -> float:
        if not self.history:
            return 0.0
        return abs(self.history[-1] - self.expected_density)

    @property
    def mean_error(self) -> float:
        if not self.history:
            return 0.0
        errors = [abs(h - self.expected_density) for h in self.history]
        return float(np.mean(errors))

    @property
    def max_error(self) -> float:
        if not self.history:
            return 0.0
        return max(abs(h - self.expected_density) for h in self.history)

    @property
    def violations(self) -> int:
        return sum(1 for h in self.history if abs(h - self.expected_density) > self.tolerance)

    @property
    def pass_rate(self) -> float:
        if not self.history:
            return 1.0
        return 1.0 - self.violations / len(self.history)

check(measured_density)

Check if measured density is within tolerance.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
335
336
337
338
def check(self, measured_density: float) -> bool:
    """Check if measured density is within tolerance."""
    self.history.append(measured_density)
    return abs(measured_density - self.expected_density) <= self.tolerance

TriggerCondition dataclass

Conditional capture trigger.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
381
382
383
384
385
386
387
388
@dataclass
class TriggerCondition:
    """Conditional capture trigger."""

    trigger_type: TriggerType
    threshold: float = 0.5
    layer_id: int = 0
    enabled: bool = True

TriggerEvent dataclass

A triggered capture event.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
391
392
393
394
395
396
397
398
399
400
@dataclass
class TriggerEvent:
    """A triggered capture event."""

    trigger_type: TriggerType
    timestamp_ns: int
    layer_id: int
    measured_value: float
    threshold: float
    sample: BitstreamSample

TriggerEngine

Evaluates capture triggers against incoming samples.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
class TriggerEngine:
    """Evaluates capture triggers against incoming samples."""

    def __init__(self) -> None:
        self.conditions: List[TriggerCondition] = []
        self.events: List[TriggerEvent] = []
        self.max_events: int = 1000

    def add_trigger(self, condition: TriggerCondition) -> None:
        self.conditions.append(condition)

    def evaluate(self, sample: BitstreamSample) -> List[TriggerEvent]:
        """Check all triggers against a sample. Returns fired events."""
        fired = []
        for cond in self.conditions:
            if not cond.enabled:
                continue
            if cond.layer_id != sample.layer_id:
                continue

            triggered = False
            measured = 0.0
            if cond.trigger_type == TriggerType.DENSITY_ABOVE:
                measured = sample.density
                triggered = measured > cond.threshold
            elif cond.trigger_type == TriggerType.DENSITY_BELOW:
                measured = sample.density
                triggered = measured < cond.threshold
            elif cond.trigger_type == TriggerType.SPIKE_DETECTED:
                measured = sample.density
                triggered = measured > 0.0

            if triggered:
                event = TriggerEvent(
                    cond.trigger_type,
                    sample.timestamp_ns,
                    sample.layer_id,
                    measured,
                    cond.threshold,
                    sample,
                )
                fired.append(event)
                if len(self.events) < self.max_events:
                    self.events.append(event)
        return fired

    @property
    def event_count(self) -> int:
        return len(self.events)

    def clear(self) -> None:
        self.events.clear()

evaluate(sample)

Check all triggers against a sample. Returns fired events.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def evaluate(self, sample: BitstreamSample) -> List[TriggerEvent]:
    """Check all triggers against a sample. Returns fired events."""
    fired = []
    for cond in self.conditions:
        if not cond.enabled:
            continue
        if cond.layer_id != sample.layer_id:
            continue

        triggered = False
        measured = 0.0
        if cond.trigger_type == TriggerType.DENSITY_ABOVE:
            measured = sample.density
            triggered = measured > cond.threshold
        elif cond.trigger_type == TriggerType.DENSITY_BELOW:
            measured = sample.density
            triggered = measured < cond.threshold
        elif cond.trigger_type == TriggerType.SPIKE_DETECTED:
            measured = sample.density
            triggered = measured > 0.0

        if triggered:
            event = TriggerEvent(
                cond.trigger_type,
                sample.timestamp_ns,
                sample.layer_id,
                measured,
                cond.threshold,
                sample,
            )
            fired.append(event)
            if len(self.events) < self.max_events:
                self.events.append(event)
    return fired

ScopeSession dataclass

Manages a live debugging session.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
@dataclass
class ScopeSession:
    """Manages a live debugging session."""

    transport: TransportBackend
    analyzer: LiveAnalyzer
    triggers: TriggerEngine = field(default_factory=TriggerEngine)
    error_budgets: Dict[int, LayerErrorBudget] = field(default_factory=dict)
    is_running: bool = False
    sample_count: int = 0
    _start_time_ns: int = 0

    def start(self) -> bool:
        """Start the scope session."""
        if not self.transport.connect():
            return False
        self.is_running = True
        self._start_time_ns = time.time_ns()
        return True

    def stop(self) -> None:
        self.is_running = False
        self.transport.disconnect()

    def add_error_budget(self, layer_id: int, expected_density: float, tol: float = 0.05) -> None:
        self.error_budgets[layer_id] = LayerErrorBudget(layer_id, expected_density, tol)

    def capture_one(
        self, layer_id: int = 0, neuron_id: int = 0, num_words: int = 8
    ) -> Optional[BitstreamSample]:
        """Capture one bitstream sample from the target."""
        if not self.is_running:
            return None
        words = self.transport.read_bitstream(num_words, layer_id)
        if words is None:
            return None
        ts = time.time_ns() - self._start_time_ns
        sample = BitstreamSample(
            timestamp_ns=ts,
            layer_id=layer_id,
            neuron_id=neuron_id,
            words=words,
            sample_index=self.sample_count,
        )
        self.sample_count += 1
        self.analyzer.ingest(sample)

        # Check error budgets
        if layer_id in self.error_budgets:
            self.error_budgets[layer_id].check(sample.density)

        # Evaluate triggers
        self.triggers.evaluate(sample)
        return sample

    def capture_sweep(self, num_layers: int, num_words: int = 8) -> List[BitstreamSample]:
        """Capture one sample from each layer."""
        samples = []
        for lid in range(num_layers):
            s = self.capture_one(layer_id=lid, num_words=num_words)
            if s is not None:
                samples.append(s)
        return samples

    def status(self) -> Dict[str, Any]:
        elapsed = (time.time_ns() - self._start_time_ns) / 1e9 if self._start_time_ns else 0
        return {
            "running": self.is_running,
            "samples": self.sample_count,
            "elapsed_s": round(elapsed, 3),
            "bytes_received": self.transport.bytes_received,
            "triggers_fired": self.triggers.event_count,
            "layers_tracked": len(self.analyzer.windows),
        }

start()

Start the scope session.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
472
473
474
475
476
477
478
def start(self) -> bool:
    """Start the scope session."""
    if not self.transport.connect():
        return False
    self.is_running = True
    self._start_time_ns = time.time_ns()
    return True

capture_one(layer_id=0, neuron_id=0, num_words=8)

Capture one bitstream sample from the target.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def capture_one(
    self, layer_id: int = 0, neuron_id: int = 0, num_words: int = 8
) -> Optional[BitstreamSample]:
    """Capture one bitstream sample from the target."""
    if not self.is_running:
        return None
    words = self.transport.read_bitstream(num_words, layer_id)
    if words is None:
        return None
    ts = time.time_ns() - self._start_time_ns
    sample = BitstreamSample(
        timestamp_ns=ts,
        layer_id=layer_id,
        neuron_id=neuron_id,
        words=words,
        sample_index=self.sample_count,
    )
    self.sample_count += 1
    self.analyzer.ingest(sample)

    # Check error budgets
    if layer_id in self.error_budgets:
        self.error_budgets[layer_id].check(sample.density)

    # Evaluate triggers
    self.triggers.evaluate(sample)
    return sample

capture_sweep(num_layers, num_words=8)

Capture one sample from each layer.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
515
516
517
518
519
520
521
522
def capture_sweep(self, num_layers: int, num_words: int = 8) -> List[BitstreamSample]:
    """Capture one sample from each layer."""
    samples = []
    for lid in range(num_layers):
        s = self.capture_one(layer_id=lid, num_words=num_words)
        if s is not None:
            samples.append(s)
    return samples

ScopeRenderer

Text-mode rendering of live scope data for CLI output.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
class ScopeRenderer:
    """Text-mode rendering of live scope data for CLI output."""

    BAR_WIDTH = 40

    @classmethod
    def render_density_bar(cls, density: float, width: int = 40) -> str:
        """Render a density as a text bar."""
        filled = int(density * width)
        return f"[{'█' * filled}{'░' * (width - filled)}] {density:.3f}"

    @classmethod
    def render_layer_summary(cls, layer_id: int, stats: Dict[str, float]) -> str:
        if not stats:
            return f"  L{layer_id}: (no data)"
        density = stats.get("mean_density", 0.0)
        eff = stats.get("mean_effective_bits", 0.0)
        n = int(stats.get("sample_count", 0))
        bar = cls.render_density_bar(density)
        return f"  L{layer_id}: {bar}  eff={eff:.1f}b  n={n}"

    @classmethod
    def render_session(cls, session: ScopeSession) -> str:
        """Render full session status as text."""
        lines = ["═══ SC Bitstream Scope ═══"]
        st = session.status()
        lines.append(f"  Status: {'● LIVE' if st['running'] else '○ STOPPED'}")
        lines.append(f"  Samples: {st['samples']}  Elapsed: {st['elapsed_s']}s")
        lines.append(f"  Bytes: {st['bytes_received']}  Triggers: {st['triggers_fired']}")
        lines.append("──────────────────────────")
        for lid in sorted(session.analyzer.windows.keys()):
            stats = session.analyzer.layer_stats(lid)
            lines.append(cls.render_layer_summary(lid, stats))
        if session.error_budgets:
            lines.append("── Error Budgets ────────")
            for lid, eb in sorted(session.error_budgets.items()):
                status = "✓" if eb.pass_rate >= 0.95 else "✗"
                lines.append(
                    f"  L{lid}: {status} err={eb.current_error:.4f} "
                    f"mean={eb.mean_error:.4f} pass={eb.pass_rate:.1%}"
                )
        return "\n".join(lines)

render_density_bar(density, width=40) classmethod

Render a density as a text bar.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
544
545
546
547
548
@classmethod
def render_density_bar(cls, density: float, width: int = 40) -> str:
    """Render a density as a text bar."""
    filled = int(density * width)
    return f"[{'█' * filled}{'░' * (width - filled)}] {density:.3f}"

render_session(session) classmethod

Render full session status as text.

Source code in src/sc_neurocore/debug/sc_scope.py
Python
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
@classmethod
def render_session(cls, session: ScopeSession) -> str:
    """Render full session status as text."""
    lines = ["═══ SC Bitstream Scope ═══"]
    st = session.status()
    lines.append(f"  Status: {'● LIVE' if st['running'] else '○ STOPPED'}")
    lines.append(f"  Samples: {st['samples']}  Elapsed: {st['elapsed_s']}s")
    lines.append(f"  Bytes: {st['bytes_received']}  Triggers: {st['triggers_fired']}")
    lines.append("──────────────────────────")
    for lid in sorted(session.analyzer.windows.keys()):
        stats = session.analyzer.layer_stats(lid)
        lines.append(cls.render_layer_summary(lid, stats))
    if session.error_budgets:
        lines.append("── Error Budgets ────────")
        for lid, eb in sorted(session.error_budgets.items()):
            status = "✓" if eb.pass_rate >= 0.95 else "✗"
            lines.append(
                f"  L{lid}: {status} err={eb.current_error:.4f} "
                f"mean={eb.mean_error:.4f} pass={eb.pass_rate:.1%}"
            )
    return "\n".join(lines)

compute_scc(a, b)

Stochastic Computing Correlation between two u32-packed bitstreams.

Dispatches to the Rust stochastic_doctor_core.py_scc_packed when the compiled extension is importable (the default when the repo is built with maturin develop --release). Falls back to :func:_compute_scc_python when the extension is missing — the fallback is numerically identical (both implement the case-split Alaghi & Hayes 2013 form).

Source code in src/sc_neurocore/debug/sc_scope.py
Python
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def compute_scc(a: np.ndarray, b: np.ndarray) -> float:
    """Stochastic Computing Correlation between two u32-packed bitstreams.

    Dispatches to the Rust ``stochastic_doctor_core.py_scc_packed`` when the
    compiled extension is importable (the default when the repo is built with
    ``maturin develop --release``). Falls back to :func:`_compute_scc_python`
    when the extension is missing — the fallback is numerically identical
    (both implement the case-split Alaghi & Hayes 2013 form).
    """
    if len(a) != len(b) or len(a) == 0:
        return 0.0

    if _HAS_RUST_SCC:
        a32 = require_c_contiguous(a, "a", np.uint32)
        b32 = require_c_contiguous(b, "b", np.uint32)
        # Reinterpret pairs of u32 words as u64 for the Rust kernel. Popcount
        # is position-invariant inside a word, so viewing two adjacent u32s as
        # one u64 preserves the bit-level meaning on little-endian hosts. Pad
        # an odd-length array by one zero word.
        if a32.size % 2 == 1:
            a32 = np.concatenate([a32, np.zeros(1, dtype=np.uint32)])
            b32 = np.concatenate([b32, np.zeros(1, dtype=np.uint32)])
        a64 = a32.view(np.uint64)
        b64 = b32.view(np.uint64)
        total_bits = len(a) * 32
        return float(_sdc.py_scc_packed(a64, b64, total_bits))

    return _compute_scc_python(a, b)

sc_neurocore.debug.hil_server

Hardware-in-the-Loop server orchestrator.

Spawns and manages the standalone high-performance Go-based WebSocket telemetry server for real-time SC debugging and visualization.

HILServerDaemon dataclass

Manages the background execution of the Go HIL Debugger service.

Source code in src/sc_neurocore/debug/hil_server.py
Python
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@dataclass
class HILServerDaemon:
    """Manages the background execution of the Go HIL Debugger service."""

    port: int = 8081
    _process: Optional[subprocess.Popen] = None
    _go_dir: Path = Path(__file__).parent.parent / "accel" / "go" / "services" / "hil_debugger"

    def __post_init__(self) -> None:
        if self._go_dir.exists():
            return
        # Fallback for installed package
        installed_bin = Path(sysconfig.get_path("scripts")) / "hil_debugger"
        if installed_bin.exists():
            self._go_dir = installed_bin.parent
            return
        raise FileNotFoundError(
            "HIL Debugger Go binary not found. "
            "Run: cd accel/go/services/hil_debugger && go build -o hil_debugger main.go"
        )

    def start(self, build: bool = True) -> bool:
        """Compile and start the standalone HIL Debugger service."""
        if self._process and self._process.poll() is None:
            return True  # Already running

        if build and self._go_dir.is_dir():  # only build from source
            print("[HIL Daemon] Compiling high-performance Go telemetry server...")
            try:
                subprocess.run(
                    ["go", "build", "-o", "hil_debugger", "main.go"],
                    cwd=str(self._go_dir),
                    check=True,
                    capture_output=True,
                )
            except subprocess.CalledProcessError as e:
                print(f"[HIL Daemon] Build failed: {e.stderr.decode()}")
                return False

        bin_path = self._go_dir / "hil_debugger"
        if not bin_path.exists():
            print(f"[HIL Daemon] Binary {bin_path} not found.")
            return False

        env = os.environ.copy()
        env["HIL_PORT"] = str(self.port)

        self._process = subprocess.Popen(
            [str(bin_path)],
            cwd=str(self._go_dir),
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        # Wait for the service to bind and report health
        return self._wait_for_ready()

    def _wait_for_ready(self, timeout_sec: int = 5) -> bool:
        start_time = time.time()
        assert self._process is not None, "_wait_for_ready called before start()"
        while time.time() - start_time < timeout_sec:
            if self._process.poll() is not None:
                err = (
                    self._process.stderr.read().decode()
                    if self._process.stderr
                    else "unknown crash"
                )
                print(f"[HIL Daemon] Server crashed: {err}")
                return False
            conn = http.client.HTTPConnection("localhost", self.port, timeout=0.5)
            try:
                conn.request("GET", "/health")
                response = conn.getresponse()
                if response.status == 200:
                    print(f"[HIL Daemon] Server ready on port {self.port}.")
                    return True
            except (ConnectionError, TimeoutError, OSError):
                pass
            finally:
                conn.close()
            time.sleep(0.1)
        print("[HIL Daemon] Timeout waiting for readiness.")
        self.stop()
        return False

    def stop(self) -> None:
        """Gracefully terminate the background HIL debugger process."""
        if self._process and self._process.poll() is None:
            self._process.terminate()
            try:
                self._process.wait(timeout=3)
            except subprocess.TimeoutExpired:
                self._process.kill()
            self._process = None
            print("[HIL Daemon] Server stopped.")

    @property
    def is_running(self) -> bool:
        """Returns True if the daemon process is running."""
        return self._process is not None and self._process.poll() is None

is_running property

Returns True if the daemon process is running.

start(build=True)

Compile and start the standalone HIL Debugger service.

Source code in src/sc_neurocore/debug/hil_server.py
Python
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def start(self, build: bool = True) -> bool:
    """Compile and start the standalone HIL Debugger service."""
    if self._process and self._process.poll() is None:
        return True  # Already running

    if build and self._go_dir.is_dir():  # only build from source
        print("[HIL Daemon] Compiling high-performance Go telemetry server...")
        try:
            subprocess.run(
                ["go", "build", "-o", "hil_debugger", "main.go"],
                cwd=str(self._go_dir),
                check=True,
                capture_output=True,
            )
        except subprocess.CalledProcessError as e:
            print(f"[HIL Daemon] Build failed: {e.stderr.decode()}")
            return False

    bin_path = self._go_dir / "hil_debugger"
    if not bin_path.exists():
        print(f"[HIL Daemon] Binary {bin_path} not found.")
        return False

    env = os.environ.copy()
    env["HIL_PORT"] = str(self.port)

    self._process = subprocess.Popen(
        [str(bin_path)],
        cwd=str(self._go_dir),
        env=env,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )

    # Wait for the service to bind and report health
    return self._wait_for_ready()

stop()

Gracefully terminate the background HIL debugger process.

Source code in src/sc_neurocore/debug/hil_server.py
Python
113
114
115
116
117
118
119
120
121
122
def stop(self) -> None:
    """Gracefully terminate the background HIL debugger process."""
    if self._process and self._process.poll() is None:
        self._process.terminate()
        try:
            self._process.wait(timeout=3)
        except subprocess.TimeoutExpired:
            self._process.kill()
        self._process = None
        print("[HIL Daemon] Server stopped.")

sc_neurocore.debug.hil_debugger

HILDebugger

High-level wrapper for the HIL telemetry server.

Source code in src/sc_neurocore/debug/hil_debugger.py
Python
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class HILDebugger:
    """High-level wrapper for the HIL telemetry server."""

    def __init__(self, port: int = 8081) -> None:
        self.daemon = HILServerDaemon(port=port)

    def start(self) -> bool:
        """Starts the HIL debugger server."""
        return self.daemon.start()

    def stop(self) -> None:
        """Stops the HIL debugger server."""
        self.daemon.stop()

    @property
    def is_running(self) -> bool:
        """Returns True if the server is active."""
        return self.daemon.is_running

    @property
    def url(self) -> str:
        """Returns the base URL for the active telemetry server."""
        return f"http://localhost:{self.daemon.port}"

is_running property

Returns True if the server is active.

url property

Returns the base URL for the active telemetry server.

start()

Starts the HIL debugger server.

Source code in src/sc_neurocore/debug/hil_debugger.py
Python
18
19
20
def start(self) -> bool:
    """Starts the HIL debugger server."""
    return self.daemon.start()

stop()

Stops the HIL debugger server.

Source code in src/sc_neurocore/debug/hil_debugger.py
Python
22
23
24
def stop(self) -> None:
    """Stops the HIL debugger server."""
    self.daemon.stop()