Skip to content

Utilities

Core utility modules for bitstream encoding/decoding, random number generation, and signal analysis.

Bitstream Encoding

sc_neurocore.utils.bitstreams.generate_bernoulli_bitstream(p, length, rng=None)

Generate a Bernoulli bitstream of given length with probability p of '1'. This is the core SC primitive: a sequence of 0/1 bits where the proportion of 1s ~ p. Parameters


p : float Probability of 1 (unipolar encoding, 0 <= p <= 1). length : int Number of bits in the stream. rng : RNG, optional RNG instance. If None, a fresh RNG is created. Returns


np.ndarray Array of shape (length,) with dtype=uint8, values in {0,1}.

Source code in src/sc_neurocore/utils/bitstreams.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def generate_bernoulli_bitstream(
    p: float,
    length: int,
    rng: Optional[RNG] = None,
) -> np.ndarray[Any, Any]:
    """
    Generate a Bernoulli bitstream of given length with probability p of '1'.
    This is the core SC primitive: a sequence of 0/1 bits where the
    proportion of 1s ~ p.
    Parameters
    ----------
    p : float
        Probability of 1 (unipolar encoding, 0 <= p <= 1).
    length : int
        Number of bits in the stream.
    rng : RNG, optional
        RNG instance. If None, a fresh RNG is created.
    Returns
    -------
    np.ndarray
        Array of shape (length,) with dtype=uint8, values in {0,1}.
    """
    if not 0.0 <= p <= 1.0:
        raise SCEncodingError(f"Probability p must be in [0,1], got {p}.")
    if rng is None:
        rng = RNG()
    bits = rng.bernoulli(p, size=length)
    return bits.astype(np.uint8)

sc_neurocore.utils.bitstreams.generate_sobol_bitstream(p, length, seed=None)

Generate a bitstream using a Sobol sequence (Low Discrepancy Sequence). LDS provides faster convergence than random Bernoulli sequences (O(1/N) vs O(1/sqrt(N))).

Parameters

p : float Target probability. length : int Length of the bitstream. seed : int, optional Seed for the Sobol engine.

Returns

np.ndarray Array of shape (length,) with dtype=uint8, values in {0,1}.

Source code in src/sc_neurocore/utils/bitstreams.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def generate_sobol_bitstream(
    p: float,
    length: int,
    seed: Optional[int] = None,
) -> np.ndarray[Any, Any]:
    """
    Generate a bitstream using a Sobol sequence (Low Discrepancy Sequence).
    LDS provides faster convergence than random Bernoulli sequences (O(1/N) vs O(1/sqrt(N))).

    Parameters
    ----------
    p : float
        Target probability.
    length : int
        Length of the bitstream.
    seed : int, optional
        Seed for the Sobol engine.

    Returns
    -------
    np.ndarray
        Array of shape (length,) with dtype=uint8, values in {0,1}.
    """
    if not 0.0 <= p <= 1.0:
        raise SCEncodingError(f"Probability p must be in [0,1], got {p}.")

    # Create Sobol engine (1 dimension)
    import scipy.stats.qmc as qmc

    sampler = qmc.Sobol(d=1, seed=seed)

    # Generate samples. Sobol works best with powers of 2,
    # but we can take 'length' samples.
    # Note: For strict determinism, one should manage the sampler state,
    # but here we create a fresh one or seek could be used if persisting.
    # To avoid 'scramble' creating randomness if not desired, we set scramble=False by default in Sobol,
    # but scramble=True usually gives better results for integration-like tasks.
    # We'll use scramble=True with the seed.

    # Optimally, length should be power of 2 for Sobol balance properties.
    # We allow any length but warn or just proceed.

    samples = sampler.random(n=length)  # Shape (length, 1)
    samples = samples.flatten()

    # Thresholding: The standard way to convert a U[0,1] sample 's' to a bit with prob 'p'
    # is: bit = 1 if s < p else 0
    bits = (samples < p).astype(np.uint8)

    return bits

sc_neurocore.utils.bitstreams.bitstream_to_probability(bitstream)

Decode a unipolar bitstream back into a probability estimate. p_hat = (# of ones) / length

Source code in src/sc_neurocore/utils/bitstreams.py
 98
 99
100
101
102
103
104
105
def bitstream_to_probability(bitstream: np.ndarray[Any, Any]) -> float:
    """
    Decode a unipolar bitstream back into a probability estimate.
    p_hat = (# of ones) / length
    """
    if bitstream.size == 0:
        raise SCEncodingError("Bitstream is empty.")
    return float(bitstream.mean())

sc_neurocore.utils.bitstreams.value_to_unipolar_prob(x, x_min, x_max, clip=True)

Map a scalar x from [x_min, x_max] into a unipolar probability [0,1]. Linear mapping: p = (x - x_min) / (x_max - x_min) If clip=True, x is clipped into [x_min, x_max].

Source code in src/sc_neurocore/utils/bitstreams.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def value_to_unipolar_prob(
    x: float,
    x_min: float,
    x_max: float,
    clip: bool = True,
) -> float:
    """
    Map a scalar x from [x_min, x_max] into a unipolar probability [0,1].
    Linear mapping:
        p = (x - x_min) / (x_max - x_min)
    If clip=True, x is clipped into [x_min, x_max].
    """
    if x_min >= x_max:
        raise SCEncodingError("x_min must be < x_max.")
    if clip:
        x = max(min(x, x_max), x_min)
    p = (x - x_min) / (x_max - x_min)
    return float(p)

sc_neurocore.utils.bitstreams.unipolar_prob_to_value(p, x_min, x_max)

Map a unipolar probability p in [0,1] back to a scalar in [x_min, x_max]. Inverse of value_to_unipolar_prob.

Source code in src/sc_neurocore/utils/bitstreams.py
167
168
169
170
171
172
173
174
175
176
177
178
def unipolar_prob_to_value(
    p: float,
    x_min: float,
    x_max: float,
) -> float:
    """
    Map a unipolar probability p in [0,1] back to a scalar in [x_min, x_max].
    Inverse of value_to_unipolar_prob.
    """
    if not 0.0 <= p <= 1.0:
        raise SCEncodingError(f"Probability p must be in [0,1], got {p}.")
    return float(x_min + p * (x_max - x_min))

sc_neurocore.utils.bitstreams.BitstreamEncoder dataclass

Helper for encoding continuous scalar values into SC bitstreams using linear unipolar mapping. Example


encoder = BitstreamEncoder(x_min=0.0, x_max=0.1, length=1024, seed=123) bitstream = encoder.encode(0.06) # 60% ones p_hat = bitstream_to_probability(bitstream) x_rec = encoder.decode(bitstream)

Source code in src/sc_neurocore/utils/bitstreams.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
@dataclass
class BitstreamEncoder:
    """
    Helper for encoding continuous scalar values into SC bitstreams
    using linear unipolar mapping.
    Example
    -------
    encoder = BitstreamEncoder(x_min=0.0, x_max=0.1, length=1024, seed=123)
    bitstream = encoder.encode(0.06)  # 60% ones
    p_hat = bitstream_to_probability(bitstream)
    x_rec = encoder.decode(bitstream)
    """

    x_min: float
    x_max: float
    length: int = 256
    seed: Optional[int] = None
    mode: str = "bernoulli"  # "bernoulli", "sobol", "bipolar", or "chaotic"

    def __post_init__(self) -> None:
        if self.mode in ("bernoulli", "bipolar"):
            self._rng = RNG(self.seed)
        elif self.mode == "chaotic":
            from sc_neurocore.chaos.rng import ChaoticRNG

            x0 = (self.seed % 997) / 1000.0 + 0.001 if self.seed is not None else 0.5
            self._chaotic_rng = ChaoticRNG(r=4.0, x=x0)
        elif self.mode != "sobol":
            raise SCEncodingError(f"Unknown mode: {self.mode}")

    def encode(self, x: float) -> np.ndarray[Any, Any]:
        if self.mode == "bipolar":
            # Map x from [x_min, x_max] to [-1, 1], then bipolar encode
            if self.x_min >= self.x_max:
                raise SCEncodingError("x_min must be < x_max.")
            x_clipped = max(min(x, self.x_max), self.x_min)
            bipolar_val = 2.0 * (x_clipped - self.x_min) / (self.x_max - self.x_min) - 1.0
            return generate_bipolar_bitstream(bipolar_val, self.length, rng=self._rng)
        p = value_to_unipolar_prob(x, self.x_min, self.x_max, clip=True)
        if self.mode == "sobol":
            return generate_sobol_bitstream(p, self.length, seed=self.seed)
        if self.mode == "chaotic":
            return self._chaotic_rng.generate_bitstream(p, self.length)
        return generate_bernoulli_bitstream(p, self.length, rng=self._rng)

    def decode(self, bitstream: np.ndarray[Any, Any]) -> float:
        if self.mode == "bipolar":
            bipolar_val = bipolar_to_value(bitstream)
            # Map [-1, 1] back to [x_min, x_max]
            return float(self.x_min + (bipolar_val + 1.0) / 2.0 * (self.x_max - self.x_min))
        p_hat = bitstream_to_probability(bitstream)
        return unipolar_prob_to_value(p_hat, self.x_min, self.x_max)

sc_neurocore.utils.bitstreams.BitstreamAverager dataclass

Sliding-window probability estimator for bitstreams.

Example

avg = BitstreamAverager(window=100) for _ in range(100): ... avg.push(1) avg.estimate() 1.0 avg.push(0) avg.estimate() < 1.0 True

Source code in src/sc_neurocore/utils/bitstreams.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
@dataclass
class BitstreamAverager:
    """
    Sliding-window probability estimator for bitstreams.

    Example
    -------
    >>> avg = BitstreamAverager(window=100)
    >>> for _ in range(100):
    ...     avg.push(1)
    >>> avg.estimate()
    1.0
    >>> avg.push(0)
    >>> avg.estimate() < 1.0
    True
    """

    window: int
    _buffer: Optional[np.ndarray[Any, Any]] = None
    _index: int = 0
    _filled: bool = False
    _running_sum: int = 0

    def __post_init__(self) -> None:
        self._buffer = np.zeros(self.window, dtype=np.uint8)
        self._running_sum = 0

    def push(self, bit: int) -> None:
        if bit not in (0, 1):
            raise SCEncodingError("Bit must be 0 or 1.")

        # Remove old bit from sum if buffer is wrapping around
        old_bit = self._buffer[self._index]
        self._buffer[self._index] = bit

        if self._filled:
            self._running_sum = self._running_sum - old_bit + bit
        else:
            self._running_sum += bit

        self._index = (self._index + 1) % self.window
        if self._index == 0:
            self._filled = True

    def estimate(self) -> float:
        if not self._filled:
            # Estimate over the filled portion only
            count = self._index
            if count == 0:
                return 0.0
            return float(self._running_sum) / count
        return float(self._running_sum) / self.window

    def reset(self) -> None:
        self._buffer.fill(0)  # type: ignore
        self._index = 0
        self._filled = False
        self._running_sum = 0

Bipolar Encoding

sc_neurocore.utils.bitstreams.generate_bipolar_bitstream(x, length, rng=None)

Generate a bipolar SC bitstream encoding a value in [-1, +1].

Bipolar encoding: value x in [-1, 1] maps to probability p = (x + 1) / 2. Bit=1 with probability p, bit=0 with probability 1-p. Decoding: x = 2 * mean(bits) - 1.

Bipolar multiplication uses XNOR: P(A XNOR B) encodes A*B in bipolar.

Source code in src/sc_neurocore/utils/bitstreams.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def generate_bipolar_bitstream(
    x: float,
    length: int,
    rng: Optional[RNG] = None,
) -> np.ndarray[Any, Any]:
    """Generate a bipolar SC bitstream encoding a value in [-1, +1].

    Bipolar encoding: value x in [-1, 1] maps to probability p = (x + 1) / 2.
    Bit=1 with probability p, bit=0 with probability 1-p.
    Decoding: x = 2 * mean(bits) - 1.

    Bipolar multiplication uses XNOR: P(A XNOR B) encodes A*B in bipolar.
    """
    if not -1.0 <= x <= 1.0:
        raise SCEncodingError(f"Bipolar value must be in [-1,1], got {x}.")
    p = (x + 1.0) / 2.0
    return generate_bernoulli_bitstream(p, length, rng)

sc_neurocore.utils.bitstreams.bipolar_to_value(bitstream)

Decode a bipolar bitstream to a value in [-1, +1].

x = 2 * mean(bits) - 1

Source code in src/sc_neurocore/utils/bitstreams.py
127
128
129
130
131
132
133
134
def bipolar_to_value(bitstream: np.ndarray[Any, Any]) -> float:
    """Decode a bipolar bitstream to a value in [-1, +1].

    x = 2 * mean(bits) - 1
    """
    if bitstream.size == 0:
        raise SCEncodingError("Bitstream is empty.")
    return float(2.0 * bitstream.mean() - 1.0)

SC Division (CORDIV, Li et al. 2014)

sc_neurocore.utils.bitstreams.sc_divide(numerator, denominator)

Stochastic computing division via CORDIV circuit.

Li, Qian, Riedel & Bazargan, IEEE Trans. Signal Process. 62(9), 2014.

at each bit position t,
  • x[t]=1 → z[t] = 1
  • x[t]=0, y[t]=1 → z[t] = 0
  • x[t]=0, y[t]=0 → z[t] = z[t-1] (hold)

Converges to P(z=1) ≈ P(x=1) / P(y=1) when P(x) ≤ P(y).

Parameters

numerator : np.ndarray Bitstream (uint8, {0,1}) of length L. denominator : np.ndarray Bitstream (uint8, {0,1}) of length L. Must have higher or equal density.

Returns

np.ndarray Quotient bitstream of length L.

Source code in src/sc_neurocore/utils/bitstreams.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def sc_divide(
    numerator: np.ndarray[Any, Any],
    denominator: np.ndarray[Any, Any],
) -> np.ndarray[Any, Any]:
    """Stochastic computing division via CORDIV circuit.

    Li, Qian, Riedel & Bazargan, IEEE Trans. Signal Process. 62(9), 2014.

    Sequential circuit: at each bit position t,
      - x[t]=1         → z[t] = 1
      - x[t]=0, y[t]=1 → z[t] = 0
      - x[t]=0, y[t]=0 → z[t] = z[t-1] (hold)

    Converges to P(z=1) ≈ P(x=1) / P(y=1) when P(x) ≤ P(y).

    Parameters
    ----------
    numerator : np.ndarray
        Bitstream (uint8, {0,1}) of length L.
    denominator : np.ndarray
        Bitstream (uint8, {0,1}) of length L. Must have higher or equal density.

    Returns
    -------
    np.ndarray
        Quotient bitstream of length L.
    """
    numerator = np.asarray(numerator, dtype=np.uint8)
    denominator = np.asarray(denominator, dtype=np.uint8)
    if numerator.shape != denominator.shape:
        raise ValueError("numerator and denominator must have the same shape")

    out = np.zeros_like(numerator)
    prev = 0
    for t in range(len(numerator)):
        if numerator[t] == 1:
            out[t] = 1
        elif denominator[t] == 1:
            out[t] = 0
        else:
            out[t] = prev
        prev = out[t]
    return out

Adaptive Bitstream Length

Compute minimum bitstream length for target precision via Hoeffding, Chebyshev, or variance bounds.

sc_neurocore.utils.bitstreams.adaptive_length(p, epsilon=0.01, confidence=0.95, method='hoeffding', min_length=64, max_length=65536)

Compute minimum bitstream length for target precision.

Given probability p and error tolerance epsilon, returns the smallest L such that |p_hat - p| < epsilon with the given confidence.

Parameters

p : float Encoded probability in [0, 1]. epsilon : float Maximum acceptable absolute error. confidence : float Confidence level (e.g. 0.95 for 95%). method : str Bound type: "hoeffding" (tighter), "chebyshev", or "variance" (no confidence). min_length : int Minimum returned length. max_length : int Maximum returned length (hardware cap).

Returns

int Minimum bitstream length (rounded up to nearest power of 2 for Sobol compatibility).

Source code in src/sc_neurocore/utils/bitstreams.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def adaptive_length(
    p: float,
    epsilon: float = 0.01,
    confidence: float = 0.95,
    method: str = "hoeffding",
    min_length: int = 64,
    max_length: int = 65536,
) -> int:
    """Compute minimum bitstream length for target precision.

    Given probability p and error tolerance epsilon, returns the smallest L
    such that |p_hat - p| < epsilon with the given confidence.

    Parameters
    ----------
    p : float
        Encoded probability in [0, 1].
    epsilon : float
        Maximum acceptable absolute error.
    confidence : float
        Confidence level (e.g. 0.95 for 95%).
    method : str
        Bound type: "hoeffding" (tighter), "chebyshev", or "variance" (no confidence).
    min_length : int
        Minimum returned length.
    max_length : int
        Maximum returned length (hardware cap).

    Returns
    -------
    int
        Minimum bitstream length (rounded up to nearest power of 2 for Sobol compatibility).
    """
    if epsilon <= 0:
        raise ValueError(f"epsilon must be positive, got {epsilon}")

    if method == "variance":
        # Var(p_hat) = p(1-p)/L < epsilon^2 → L > p(1-p)/epsilon^2
        var_factor = p * (1.0 - p)
        L = var_factor / (epsilon**2)
    elif method == "chebyshev":
        # P(|p_hat - p| >= epsilon) <= Var/epsilon^2 <= (1-confidence)
        # L >= p(1-p) / (epsilon^2 * (1-confidence))
        delta = 1.0 - confidence
        if delta <= 0:
            raise ValueError("confidence must be < 1.0")
        L = p * (1.0 - p) / (epsilon**2 * delta)
    elif method == "hoeffding":
        # P(|p_hat - p| >= epsilon) <= 2*exp(-2*L*epsilon^2) <= (1-confidence)
        # L >= -ln((1-confidence)/2) / (2*epsilon^2)
        delta = 1.0 - confidence
        if delta <= 0:
            raise ValueError("confidence must be < 1.0")
        import math

        L = -math.log(delta / 2.0) / (2.0 * epsilon**2)
    else:
        raise ValueError(f"Unknown method: {method}. Use 'hoeffding', 'chebyshev', or 'variance'.")

    L_int = max(min_length, int(np.ceil(L)))
    # Round up to next power of 2 for Sobol compatibility
    L_pow2 = 1
    while L_pow2 < L_int:
        L_pow2 *= 2
    return min(L_pow2, max_length)

LDS Decorrelation (Sobol/Halton)

Multi-dimensional low-discrepancy sequences for per-synapse decorrelation.

sc_neurocore.utils.lds_decorrelation.generate_decorrelated_bitstreams(probabilities, length=1024, method='sobol', seed=None)

Generate decorrelated bitstreams for a probability matrix.

Each element of the probability matrix gets its own LDS dimension, ensuring zero correlation between any pair of bitstreams.

Parameters

probabilities : np.ndarray Probability matrix, any shape. Values in [0, 1]. length : int Bitstream length per element. method : str "sobol" or "halton". seed : int or None Random seed for scrambling.

Returns

np.ndarray Shape (*probabilities.shape, length), dtype uint8.

Source code in src/sc_neurocore/utils/lds_decorrelation.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def generate_decorrelated_bitstreams(
    probabilities: np.ndarray,
    length: int = 1024,
    method: str = "sobol",
    seed: int | None = None,
) -> np.ndarray:
    """Generate decorrelated bitstreams for a probability matrix.

    Each element of the probability matrix gets its own LDS dimension,
    ensuring zero correlation between any pair of bitstreams.

    Parameters
    ----------
    probabilities : np.ndarray
        Probability matrix, any shape. Values in [0, 1].
    length : int
        Bitstream length per element.
    method : str
        "sobol" or "halton".
    seed : int or None
        Random seed for scrambling.

    Returns
    -------
    np.ndarray
        Shape (*probabilities.shape, length), dtype uint8.
    """
    probs = np.asarray(probabilities, dtype=np.float64)
    flat_probs = probs.flatten()
    n_dims = len(flat_probs)

    if n_dims == 0:
        return np.zeros((*probs.shape, length), dtype=np.uint8)

    if method == "sobol":
        sampler = qmc.Sobol(d=n_dims, seed=seed)
        samples = sampler.random(n=length)  # (length, n_dims)
    elif method == "halton":
        sampler = qmc.Halton(d=n_dims, seed=seed)
        samples = sampler.random(n=length)  # (length, n_dims)
    else:
        raise ValueError(f"Unknown method: {method}. Use 'sobol' or 'halton'.")

    # Threshold each dimension against its probability
    bits = np.zeros((n_dims, length), dtype=np.uint8)
    for d in range(n_dims):
        p = float(np.clip(flat_probs[d], 0.0, 1.0))
        bits[d] = (samples[:, d] < p).astype(np.uint8)

    return bits.reshape(*probs.shape, length)

sc_neurocore.utils.lds_decorrelation.star_discrepancy_estimate(samples, n_test=10000)

Estimate star discrepancy of a sample set (quality metric for LDS).

Lower discrepancy → more uniform coverage → better SC precision.

Parameters

samples : np.ndarray Shape (n_samples, d), values in [0, 1]. n_test : int Number of random test points.

Returns

float Estimated star discrepancy.

Source code in src/sc_neurocore/utils/lds_decorrelation.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def star_discrepancy_estimate(
    samples: np.ndarray,
    n_test: int = 10000,
) -> float:
    """Estimate star discrepancy of a sample set (quality metric for LDS).

    Lower discrepancy → more uniform coverage → better SC precision.

    Parameters
    ----------
    samples : np.ndarray
        Shape (n_samples, d), values in [0, 1].
    n_test : int
        Number of random test points.

    Returns
    -------
    float
        Estimated star discrepancy.
    """
    n, d = samples.shape
    rng = np.random.RandomState(42)
    test_points = rng.uniform(0, 1, (n_test, d))

    max_disc = 0.0
    for pt in test_points:
        # Fraction of samples in [0, pt] hypercube
        inside = np.all(samples <= pt, axis=1)
        empirical = np.mean(inside)
        # Volume of [0, pt] hypercube
        volume = np.prod(pt)
        disc = abs(empirical - volume)
        if disc > max_disc:
            max_disc = disc

    return float(max_disc)

Random Number Generation

sc_neurocore.utils.rng.RNG

Thin wrapper around NumPy RNG for reproducible per-neuron streams.

Example

rng = RNG(seed=42) vals = rng.random(5) vals.shape (5,) RNG(seed=42).random(5) == vals # deterministic array([ True, True, True, True, True])

Source code in src/sc_neurocore/utils/rng.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class RNG:
    """
    Thin wrapper around NumPy RNG for reproducible per-neuron streams.

    Example
    -------
    >>> rng = RNG(seed=42)
    >>> vals = rng.random(5)
    >>> vals.shape
    (5,)
    >>> RNG(seed=42).random(5) == vals  # deterministic
    array([ True,  True,  True,  True,  True])
    """

    def __init__(self, seed: Optional[int] = None) -> None:
        self._rng = np.random.default_rng(seed)

    def normal(self, mean: float = 0.0, std: float = 1.0, size=None):
        return self._rng.normal(mean, std, size)

    def uniform(self, low: float = 0.0, high: float = 1.0, size=None):
        return self._rng.uniform(low, high, size)

    def bernoulli(self, p: float, size=None):
        return self._rng.random(size) < p

    def random(self, size=None):
        return self._rng.random(size)

    def shuffle(self, x):
        return self._rng.shuffle(x)

Adaptive Utilities

sc_neurocore.utils.adaptive

AdaptiveInference dataclass

Manages Progressive Precision / Early Exit for SC.

Source code in src/sc_neurocore/utils/adaptive.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@dataclass
class AdaptiveInference:
    """
    Manages Progressive Precision / Early Exit for SC.
    """

    check_interval: int = 64
    tolerance: float = 0.05  # 5% stability
    min_length: int = 128
    max_length: int = 2048

    def run_adaptive(self, step_func: Callable[[], float]) -> float:
        """
        Runs the SC process step-by-step until convergence or max_length.

        Args:
            step_func: Function that executes one step and returns current estimate.
        """
        history: List[float] = []

        current_val = 0.0

        for t in range(self.max_length):
            current_val = step_func()

            if t >= self.min_length and t % self.check_interval == 0:
                # Check stability over last 3 checks
                history.append(current_val)
                if len(history) >= 3:
                    # If variance is low, exit
                    recent = history[-3:]
                    if (max(recent) - min(recent)) < self.tolerance:
                        return current_val

        return current_val

run_adaptive(step_func)

Runs the SC process step-by-step until convergence or max_length.

Parameters:

Name Type Description Default
step_func Callable[[], float]

Function that executes one step and returns current estimate.

required
Source code in src/sc_neurocore/utils/adaptive.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def run_adaptive(self, step_func: Callable[[], float]) -> float:
    """
    Runs the SC process step-by-step until convergence or max_length.

    Args:
        step_func: Function that executes one step and returns current estimate.
    """
    history: List[float] = []

    current_val = 0.0

    for t in range(self.max_length):
        current_val = step_func()

        if t >= self.min_length and t % self.check_interval == 0:
            # Check stability over last 3 checks
            history.append(current_val)
            if len(history) >= 3:
                # If variance is low, exit
                recent = history[-3:]
                if (max(recent) - min(recent)) < self.tolerance:
                    return current_val

    return current_val

Connectome Generation

sc_neurocore.utils.connectomes

ConnectomeGenerator

Generates biologically plausible connectivity matrices.

Source code in src/sc_neurocore/utils/connectomes.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class ConnectomeGenerator:
    """
    Generates biologically plausible connectivity matrices.
    """

    @staticmethod
    def generate_watts_strogatz(
        n_neurons: int, k_neighbors: int, p_rewire: float
    ) -> np.ndarray[Any, Any]:
        """
        Watts-Strogatz Small-World Model.

        1. Start with a regular ring lattice (connect to k neighbors).
        2. Randomly rewire edges with probability p.

        Returns:
            Adjacency Matrix (Binary)
        """
        if k_neighbors >= n_neurons:
            return np.ones((n_neurons, n_neurons)) - np.eye(n_neurons)

        adj = np.zeros((n_neurons, n_neurons), dtype=int)

        # 1. Create Ring Lattice
        for i in range(n_neurons):
            for j in range(1, k_neighbors // 2 + 1):
                # Connect forward
                target = (i + j) % n_neurons
                adj[i, target] = 1
                adj[target, i] = 1  # Undirected for now, or directed?
                # Synapses are usually directed. Let's make it directed ring.

        # 2. Rewire
        for i in range(n_neurons):
            for j in range(1, k_neighbors // 2 + 1):
                target = (i + j) % n_neurons

                if np.random.random() < p_rewire:
                    # Delete old edge
                    adj[i, target] = 0

                    # Find new target (avoid self and existing)
                    new_target = i
                    while new_target == i or adj[i, new_target] == 1:
                        new_target = np.random.randint(0, n_neurons)

                    adj[i, new_target] = 1

        return adj

    @staticmethod
    def generate_scale_free(n_neurons: int) -> np.ndarray[Any, Any]:
        """
        Barabasi-Albert Scale-Free Model (Preferential Attachment).
        """
        # Start with 2 connected nodes
        adj = np.zeros((n_neurons, n_neurons), dtype=int)
        adj[0, 1] = 1
        adj[1, 0] = 1
        degrees = np.zeros(n_neurons)
        degrees[0] = 1
        degrees[1] = 1

        active_nodes = 2

        for i in range(2, n_neurons):
            # Connect to m=1 or m=2 existing nodes based on degree
            # Prob(connect to j) = deg(j) / sum(deg)

            probs = degrees[:active_nodes] / np.sum(degrees[:active_nodes])

            # Select target
            target = np.random.choice(np.arange(active_nodes), p=probs)

            adj[i, target] = 1
            # Directed: i -> target

            degrees[i] += 1
            degrees[target] += 1
            active_nodes += 1

        return adj

generate_watts_strogatz(n_neurons, k_neighbors, p_rewire) staticmethod

Watts-Strogatz Small-World Model.

  1. Start with a regular ring lattice (connect to k neighbors).
  2. Randomly rewire edges with probability p.

Returns:

Type Description
ndarray[Any, Any]

Adjacency Matrix (Binary)

Source code in src/sc_neurocore/utils/connectomes.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@staticmethod
def generate_watts_strogatz(
    n_neurons: int, k_neighbors: int, p_rewire: float
) -> np.ndarray[Any, Any]:
    """
    Watts-Strogatz Small-World Model.

    1. Start with a regular ring lattice (connect to k neighbors).
    2. Randomly rewire edges with probability p.

    Returns:
        Adjacency Matrix (Binary)
    """
    if k_neighbors >= n_neurons:
        return np.ones((n_neurons, n_neurons)) - np.eye(n_neurons)

    adj = np.zeros((n_neurons, n_neurons), dtype=int)

    # 1. Create Ring Lattice
    for i in range(n_neurons):
        for j in range(1, k_neighbors // 2 + 1):
            # Connect forward
            target = (i + j) % n_neurons
            adj[i, target] = 1
            adj[target, i] = 1  # Undirected for now, or directed?
            # Synapses are usually directed. Let's make it directed ring.

    # 2. Rewire
    for i in range(n_neurons):
        for j in range(1, k_neighbors // 2 + 1):
            target = (i + j) % n_neurons

            if np.random.random() < p_rewire:
                # Delete old edge
                adj[i, target] = 0

                # Find new target (avoid self and existing)
                new_target = i
                while new_target == i or adj[i, new_target] == 1:
                    new_target = np.random.randint(0, n_neurons)

                adj[i, new_target] = 1

    return adj

generate_scale_free(n_neurons) staticmethod

Barabasi-Albert Scale-Free Model (Preferential Attachment).

Source code in src/sc_neurocore/utils/connectomes.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@staticmethod
def generate_scale_free(n_neurons: int) -> np.ndarray[Any, Any]:
    """
    Barabasi-Albert Scale-Free Model (Preferential Attachment).
    """
    # Start with 2 connected nodes
    adj = np.zeros((n_neurons, n_neurons), dtype=int)
    adj[0, 1] = 1
    adj[1, 0] = 1
    degrees = np.zeros(n_neurons)
    degrees[0] = 1
    degrees[1] = 1

    active_nodes = 2

    for i in range(2, n_neurons):
        # Connect to m=1 or m=2 existing nodes based on degree
        # Prob(connect to j) = deg(j) / sum(deg)

        probs = degrees[:active_nodes] / np.sum(degrees[:active_nodes])

        # Select target
        target = np.random.choice(np.arange(active_nodes), p=probs)

        adj[i, target] = 1
        # Directed: i -> target

        degrees[i] += 1
        degrees[target] += 1
        active_nodes += 1

    return adj

Decorrelators

sc_neurocore.utils.decorrelators

Decorrelator dataclass

Base class for bitstream decorrelators.

Source code in src/sc_neurocore/utils/decorrelators.py
14
15
16
17
18
19
20
21
@dataclass
class Decorrelator:
    """
    Base class for bitstream decorrelators.
    """

    def process(self, bitstream: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
        raise NotImplementedError

ShufflingDecorrelator dataclass

Bases: Decorrelator

Decorrelates a bitstream by randomly shuffling bits within a window. This preserves the exact bit count (probability) but destroys temporal correlations.

Source code in src/sc_neurocore/utils/decorrelators.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@dataclass
class ShufflingDecorrelator(Decorrelator):
    """
    Decorrelates a bitstream by randomly shuffling bits within a window.
    This preserves the exact bit count (probability) but destroys temporal correlations.
    """

    window_size: int = 16
    seed: Optional[int] = None

    def __post_init__(self) -> None:
        self._rng = RNG(self.seed)

    def process(self, bitstream: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
        # Reshape into windows
        length = len(bitstream)
        pad = (self.window_size - (length % self.window_size)) % self.window_size

        if pad > 0:
            padded = np.append(bitstream, np.zeros(pad, dtype=np.uint8))
        else:
            padded = bitstream.copy()

        num_windows = len(padded) // self.window_size
        reshaped = padded.reshape((num_windows, self.window_size))

        # Shuffle each row
        # Note: Ideally we want independent shuffles per row.
        # fast way:
        for i in range(num_windows):
            self._rng.shuffle(reshaped[i])

        return reshaped.flatten()[:length]

LFSRRegenDecorrelator dataclass

Bases: Decorrelator

Regenerates a new bitstream with the same probability estimate but using a different random source (LFSR-like or just new RNG).

Source code in src/sc_neurocore/utils/decorrelators.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@dataclass
class LFSRRegenDecorrelator(Decorrelator):
    """
    Regenerates a new bitstream with the same probability estimate
    but using a different random source (LFSR-like or just new RNG).
    """

    seed: Optional[int] = None

    def __post_init__(self) -> None:
        self._rng = RNG(self.seed)

    def process(self, bitstream: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
        p_est = bitstream.mean()
        # Regenerate
        return self._rng.bernoulli(p_est, size=len(bitstream)).astype(np.uint8)

Fault Injection

sc_neurocore.utils.fault_injection

FaultInjector

Simulates hardware faults in Stochastic Computing bitstreams.

Source code in src/sc_neurocore/utils/fault_injection.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class FaultInjector:
    """
    Simulates hardware faults in Stochastic Computing bitstreams.
    """

    @staticmethod
    def inject_bit_flips(
        bitstream: np.ndarray[Any, Any], error_rate: float
    ) -> np.ndarray[Any, Any]:
        """
        Randomly flips bits with probability 'error_rate'.
        """
        if error_rate <= 0:
            return bitstream

        # Generate error mask (1 where error occurs)
        # Using numpy for speed
        mask = np.random.random(bitstream.shape) < error_rate

        # XOR with mask flips the bits where mask is 1
        # bitstream is uint8 {0,1}
        # We need to ensure we don't go out of bounds (0/1)
        # 0 ^ 1 = 1
        # 1 ^ 1 = 0
        # 0 ^ 0 = 0
        # 1 ^ 0 = 1

        corrupted = np.bitwise_xor(bitstream.astype(bool), mask)
        return corrupted.astype(np.uint8)

    @staticmethod
    def inject_stuck_at(
        bitstream: np.ndarray[Any, Any], fault_rate: float, value: int
    ) -> np.ndarray[Any, Any]:
        """
        Simulates Stuck-At-0 or Stuck-At-1 faults.
        """
        mask = np.random.random(bitstream.shape) < fault_rate
        corrupted = bitstream.copy()
        corrupted[mask] = value
        return corrupted

inject_bit_flips(bitstream, error_rate) staticmethod

Randomly flips bits with probability 'error_rate'.

Source code in src/sc_neurocore/utils/fault_injection.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@staticmethod
def inject_bit_flips(
    bitstream: np.ndarray[Any, Any], error_rate: float
) -> np.ndarray[Any, Any]:
    """
    Randomly flips bits with probability 'error_rate'.
    """
    if error_rate <= 0:
        return bitstream

    # Generate error mask (1 where error occurs)
    # Using numpy for speed
    mask = np.random.random(bitstream.shape) < error_rate

    # XOR with mask flips the bits where mask is 1
    # bitstream is uint8 {0,1}
    # We need to ensure we don't go out of bounds (0/1)
    # 0 ^ 1 = 1
    # 1 ^ 1 = 0
    # 0 ^ 0 = 0
    # 1 ^ 0 = 1

    corrupted = np.bitwise_xor(bitstream.astype(bool), mask)
    return corrupted.astype(np.uint8)

inject_stuck_at(bitstream, fault_rate, value) staticmethod

Simulates Stuck-At-0 or Stuck-At-1 faults.

Source code in src/sc_neurocore/utils/fault_injection.py
42
43
44
45
46
47
48
49
50
51
52
@staticmethod
def inject_stuck_at(
    bitstream: np.ndarray[Any, Any], fault_rate: float, value: int
) -> np.ndarray[Any, Any]:
    """
    Simulates Stuck-At-0 or Stuck-At-1 faults.
    """
    mask = np.random.random(bitstream.shape) < fault_rate
    corrupted = bitstream.copy()
    corrupted[mask] = value
    return corrupted

FSM Activations

sc_neurocore.utils.fsm_activations

FSMActivation dataclass

Base class for FSM-based stochastic activation functions.

The FSM takes a bitstream input and transitions between states. The output bit is determined by the current state (e.g., if state > N/2, out=1). This implements saturating non-linearities like Tanh or Sigmoid efficiently.

Source code in src/sc_neurocore/utils/fsm_activations.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@dataclass
class FSMActivation:
    """
    Base class for FSM-based stochastic activation functions.

    The FSM takes a bitstream input and transitions between states.
    The output bit is determined by the current state (e.g., if state > N/2, out=1).
    This implements saturating non-linearities like Tanh or Sigmoid efficiently.
    """

    num_states: int
    initial_state: int

    def __post_init__(self) -> None:
        self.state = self.initial_state

    def step(self, bit: int) -> int:
        raise NotImplementedError

    def process(self, bitstream: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
        output = np.zeros_like(bitstream)
        for i, bit in enumerate(bitstream):
            output[i] = self.step(bit)
        return output

TanhFSM dataclass

Bases: FSMActivation

Implements a Tanh-like function using a linear FSM.

States: 0 to N-1 Input 0: state -> max(0, state - 1) Input 1: state -> min(N-1, state + 1) Output: 1 if state >= N/2 else 0

Source code in src/sc_neurocore/utils/fsm_activations.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class TanhFSM(FSMActivation):
    """
    Implements a Tanh-like function using a linear FSM.

    States: 0 to N-1
    Input 0: state -> max(0, state - 1)
    Input 1: state -> min(N-1, state + 1)
    Output: 1 if state >= N/2 else 0
    """

    def __init__(self, states: int = 16):
        self.num_states = states
        self.initial_state = states // 2
        super().__post_init__()  # type: ignore

    def step(self, bit: int) -> int:
        if bit == 1:
            if self.state < self.num_states - 1:
                self.state += 1
        else:
            if self.state > 0:
                self.state -= 1

        return 1 if self.state >= (self.num_states // 2) else 0

ReLKFSM dataclass

Bases: FSMActivation

Implements a Rectified Linear (ReLU-like) behavior. Can be complex in SC, often approximated or used with bipolar coding. Here we implement a simple saturating counter.

Source code in src/sc_neurocore/utils/fsm_activations.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@dataclass
class ReLKFSM(FSMActivation):
    """
    Implements a Rectified Linear (ReLU-like) behavior.
    Can be complex in SC, often approximated or used with bipolar coding.
    Here we implement a simple saturating counter.
    """

    def __init__(self, states: int = 16):
        self.num_states = states
        self.initial_state = 0  # Start at 0
        super().__post_init__()  # type: ignore

    def step(self, bit: int) -> int:
        if bit == 1:
            if self.state < self.num_states - 1:
                self.state += 1
        else:
            if self.state > 0:
                self.state -= 1

        # Probabilistic output based on state?
        # Or threshold? ReLK usually implies simple pass-through if > 0.
        # This implementation is a "Stochastic Integrator"
        return 1 if self.state > 0 else 0

Model Bridge

sc_neurocore.utils.model_bridge

SCBridge

Bridge between standard DL frameworks (like PyTorch) and SC-NeuroCore.

Source code in src/sc_neurocore/utils/model_bridge.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class SCBridge:
    """
    Bridge between standard DL frameworks (like PyTorch) and SC-NeuroCore.
    """

    @staticmethod
    def load_from_state_dict(state_dict: Dict[str, Any], layer_mapping: Dict[str, Any]) -> None:
        """
        Load weights from a state_dict (numpy or torch tensors) into SC layers.

        Args:
            state_dict: Dictionary mapping "layer_name.weight" to arrays.
            layer_mapping: Dictionary mapping "layer_name" to SCLayer instances.
        """
        logger.info("SCBridge: Loading model weights...")

        for name, layer in layer_mapping.items():
            # Look for weight key
            weight_key = f"{name}.weight"

            if weight_key in state_dict:
                w = np.array(state_dict[weight_key])
                logger.info("  Found weights for %s: shape %s", name, w.shape)

                # Normalize for SC
                w_norm = normalize_weights(w)

                # Check dimensions
                if hasattr(layer, "weights"):
                    if layer.weights.shape == w_norm.shape:
                        layer.weights = w_norm
                        # If vectorized, refresh
                        if hasattr(layer, "_refresh_packed_weights"):
                            layer._refresh_packed_weights()
                        # If learning layer, update synapse objects
                        if hasattr(layer, "synapses"):
                            # Update individual synapses
                            for i in range(w_norm.shape[0]):
                                for j in range(w_norm.shape[1]):
                                    layer.synapses[i][j].update_weight(w_norm[i, j])
                    else:
                        logger.warning(
                            "  Shape mismatch for %s. SC: %s, Dict: %s",
                            name,
                            layer.weights.shape,
                            w.shape,
                        )
                else:
                    logger.warning("  Layer %s does not have 'weights' attribute.", name)
            else:
                logger.debug("  No weights found for %s", name)

    @staticmethod
    def export_to_numpy(layers: Dict[str, Any]) -> Dict[str, np.ndarray[Any, Any]]:
        """
        Export SC weights back to numpy dictionary.
        """
        state = {}
        for name, layer in layers.items():
            if hasattr(layer, "get_weights"):
                state[f"{name}.weight"] = layer.get_weights()
            elif hasattr(layer, "weights"):
                state[f"{name}.weight"] = layer.weights
        return state

load_from_state_dict(state_dict, layer_mapping) staticmethod

Load weights from a state_dict (numpy or torch tensors) into SC layers.

Parameters:

Name Type Description Default
state_dict Dict[str, Any]

Dictionary mapping "layer_name.weight" to arrays.

required
layer_mapping Dict[str, Any]

Dictionary mapping "layer_name" to SCLayer instances.

required
Source code in src/sc_neurocore/utils/model_bridge.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@staticmethod
def load_from_state_dict(state_dict: Dict[str, Any], layer_mapping: Dict[str, Any]) -> None:
    """
    Load weights from a state_dict (numpy or torch tensors) into SC layers.

    Args:
        state_dict: Dictionary mapping "layer_name.weight" to arrays.
        layer_mapping: Dictionary mapping "layer_name" to SCLayer instances.
    """
    logger.info("SCBridge: Loading model weights...")

    for name, layer in layer_mapping.items():
        # Look for weight key
        weight_key = f"{name}.weight"

        if weight_key in state_dict:
            w = np.array(state_dict[weight_key])
            logger.info("  Found weights for %s: shape %s", name, w.shape)

            # Normalize for SC
            w_norm = normalize_weights(w)

            # Check dimensions
            if hasattr(layer, "weights"):
                if layer.weights.shape == w_norm.shape:
                    layer.weights = w_norm
                    # If vectorized, refresh
                    if hasattr(layer, "_refresh_packed_weights"):
                        layer._refresh_packed_weights()
                    # If learning layer, update synapse objects
                    if hasattr(layer, "synapses"):
                        # Update individual synapses
                        for i in range(w_norm.shape[0]):
                            for j in range(w_norm.shape[1]):
                                layer.synapses[i][j].update_weight(w_norm[i, j])
                else:
                    logger.warning(
                        "  Shape mismatch for %s. SC: %s, Dict: %s",
                        name,
                        layer.weights.shape,
                        w.shape,
                    )
            else:
                logger.warning("  Layer %s does not have 'weights' attribute.", name)
        else:
            logger.debug("  No weights found for %s", name)

export_to_numpy(layers) staticmethod

Export SC weights back to numpy dictionary.

Source code in src/sc_neurocore/utils/model_bridge.py
83
84
85
86
87
88
89
90
91
92
93
94
@staticmethod
def export_to_numpy(layers: Dict[str, Any]) -> Dict[str, np.ndarray[Any, Any]]:
    """
    Export SC weights back to numpy dictionary.
    """
    state = {}
    for name, layer in layers.items():
        if hasattr(layer, "get_weights"):
            state[f"{name}.weight"] = layer.get_weights()
        elif hasattr(layer, "weights"):
            state[f"{name}.weight"] = layer.weights
    return state

normalize_weights(weights)

Normalizes weights to [0, 1] range for unipolar SC.

Source code in src/sc_neurocore/utils/model_bridge.py
20
21
22
23
24
25
26
27
28
def normalize_weights(weights: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
    """
    Normalizes weights to [0, 1] range for unipolar SC.
    """
    w_min = weights.min()
    w_max = weights.max()
    if w_max == w_min:
        return np.ones_like(weights) * 0.5
    return (weights - w_min) / (w_max - w_min)