Utilities
Core utility modules for bitstream encoding/decoding, random number
generation, and signal analysis.
Bitstream Encoding
sc_neurocore.utils.bitstreams.generate_bernoulli_bitstream(p, length, rng=None)
Generate a Bernoulli bitstream of given length with probability p of '1'.
This is the core SC primitive: a sequence of 0/1 bits where the
proportion of 1s ~ p.
Parameters
p : float
Probability of 1 (unipolar encoding, 0 <= p <= 1).
length : int
Number of bits in the stream.
rng : RNG, optional
RNG instance. If None, a fresh RNG is created.
Returns
np.ndarray
Array of shape (length,) with dtype=uint8, values in {0,1}.
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 | def generate_bernoulli_bitstream(
p: float,
length: int,
rng: Optional[RNG] = None,
) -> np.ndarray[Any, Any]:
"""
Generate a Bernoulli bitstream of given length with probability p of '1'.
This is the core SC primitive: a sequence of 0/1 bits where the
proportion of 1s ~ p.
Parameters
----------
p : float
Probability of 1 (unipolar encoding, 0 <= p <= 1).
length : int
Number of bits in the stream.
rng : RNG, optional
RNG instance. If None, a fresh RNG is created.
Returns
-------
np.ndarray
Array of shape (length,) with dtype=uint8, values in {0,1}.
"""
if not 0.0 <= p <= 1.0:
raise SCEncodingError(f"Probability p must be in [0,1], got {p}.")
if rng is None:
rng = RNG()
bits = rng.bernoulli(p, size=length)
encoded: np.ndarray[Any, Any] = bits.astype(np.uint8)
return encoded
|
sc_neurocore.utils.bitstreams.generate_sobol_bitstream(p, length, seed=None)
Generate a bitstream using a Sobol sequence (Low Discrepancy Sequence).
LDS provides faster convergence than random Bernoulli sequences (O(1/N) vs O(1/sqrt(N))).
Parameters
p : float
Target probability.
length : int
Length of the bitstream.
seed : int, optional
Seed for the Sobol engine.
Returns
np.ndarray
Array of shape (length,) with dtype=uint8, values in {0,1}.
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 | def generate_sobol_bitstream(
p: float,
length: int,
seed: Optional[int] = None,
) -> np.ndarray[Any, Any]:
"""
Generate a bitstream using a Sobol sequence (Low Discrepancy Sequence).
LDS provides faster convergence than random Bernoulli sequences (O(1/N) vs O(1/sqrt(N))).
Parameters
----------
p : float
Target probability.
length : int
Length of the bitstream.
seed : int, optional
Seed for the Sobol engine.
Returns
-------
np.ndarray
Array of shape (length,) with dtype=uint8, values in {0,1}.
"""
if not 0.0 <= p <= 1.0:
raise SCEncodingError(f"Probability p must be in [0,1], got {p}.")
# Create Sobol engine (1 dimension)
import scipy.stats.qmc as qmc
sampler = qmc.Sobol(d=1, seed=seed)
# Generate samples. Sobol works best with powers of 2,
# but we can take 'length' samples.
# Note: For strict determinism, one should manage the sampler state,
# but here we create a fresh one or seek could be used if persisting.
# To avoid 'scramble' creating randomness if not desired, we set scramble=False by default in Sobol,
# but scramble=True usually gives better results for integration-like tasks.
# We'll use scramble=True with the seed.
# Optimally, length should be power of 2 for Sobol balance properties.
# We allow any length but warn or just proceed.
samples = sampler.random(n=length) # Shape (length, 1)
samples = samples.flatten()
# Thresholding: The standard way to convert a U[0,1] sample 's' to a bit with prob 'p'
# is: bit = 1 if s < p else 0
bits: np.ndarray[Any, Any] = (samples < p).astype(np.uint8)
return bits
|
sc_neurocore.utils.bitstreams.bitstream_to_probability(bitstream)
Decode a unipolar bitstream back into a probability estimate.
p_hat = (# of ones) / length
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
104
105
106
107
108
109
110
111 | def bitstream_to_probability(bitstream: np.ndarray[Any, Any]) -> float:
"""
Decode a unipolar bitstream back into a probability estimate.
p_hat = (# of ones) / length
"""
if bitstream.size == 0:
raise SCEncodingError("Bitstream is empty.")
return float(bitstream.mean())
|
sc_neurocore.utils.bitstreams.value_to_unipolar_prob(x, x_min, x_max, clip=True)
Map a scalar x from [x_min, x_max] into a unipolar probability [0,1].
Linear mapping:
p = (x - x_min) / (x_max - x_min)
If clip=True, x is clipped into [x_min, x_max].
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 | def value_to_unipolar_prob(
x: float,
x_min: float,
x_max: float,
clip: bool = True,
) -> float:
"""
Map a scalar x from [x_min, x_max] into a unipolar probability [0,1].
Linear mapping:
p = (x - x_min) / (x_max - x_min)
If clip=True, x is clipped into [x_min, x_max].
"""
if x_min >= x_max:
raise SCEncodingError("x_min must be < x_max.")
if clip:
x = max(min(x, x_max), x_min)
p = (x - x_min) / (x_max - x_min)
return float(p)
|
sc_neurocore.utils.bitstreams.unipolar_prob_to_value(p, x_min, x_max)
Map a unipolar probability p in [0,1] back to a scalar in [x_min, x_max].
Inverse of value_to_unipolar_prob.
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
173
174
175
176
177
178
179
180
181
182
183
184 | def unipolar_prob_to_value(
p: float,
x_min: float,
x_max: float,
) -> float:
"""
Map a unipolar probability p in [0,1] back to a scalar in [x_min, x_max].
Inverse of value_to_unipolar_prob.
"""
if not 0.0 <= p <= 1.0:
raise SCEncodingError(f"Probability p must be in [0,1], got {p}.")
return float(x_min + p * (x_max - x_min))
|
sc_neurocore.utils.bitstreams.BitstreamEncoder
dataclass
Helper for encoding continuous scalar values into SC bitstreams
using linear unipolar mapping.
Example
encoder = BitstreamEncoder(x_min=0.0, x_max=0.1, length=1024, seed=123)
bitstream = encoder.encode(0.06) # 60% ones
p_hat = bitstream_to_probability(bitstream)
x_rec = encoder.decode(bitstream)
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353 | @dataclass
class BitstreamEncoder:
"""
Helper for encoding continuous scalar values into SC bitstreams
using linear unipolar mapping.
Example
-------
encoder = BitstreamEncoder(x_min=0.0, x_max=0.1, length=1024, seed=123)
bitstream = encoder.encode(0.06) # 60% ones
p_hat = bitstream_to_probability(bitstream)
x_rec = encoder.decode(bitstream)
"""
# x_min/x_max default to the unipolar probability domain [0, 1] so callers
# that only set length/seed (e.g. the SCPN-CONTROL compiler) construct a valid
# encoder; explicit ranges remain supported positionally and by keyword.
x_min: float = 0.0
x_max: float = 1.0
length: int = 256
seed: Optional[int] = None
mode: str = "bernoulli" # "bernoulli", "sobol", "bipolar", or "chaotic"
def __post_init__(self) -> None:
if self.mode in ("bernoulli", "bipolar"):
self._rng = RNG(self.seed)
elif self.mode == "chaotic":
from sc_neurocore.chaos.rng import ChaoticRNG
x0 = (self.seed % 997) / 1000.0 + 0.001 if self.seed is not None else 0.5
self._chaotic_rng = ChaoticRNG(r=4.0, x=x0)
elif self.mode != "sobol":
raise SCEncodingError(f"Unknown mode: {self.mode}")
def encode(self, x: float) -> np.ndarray[Any, Any]:
if self.mode == "bipolar":
# Map x from [x_min, x_max] to [-1, 1], then bipolar encode
if self.x_min >= self.x_max:
raise SCEncodingError("x_min must be < x_max.")
x_clipped = max(min(x, self.x_max), self.x_min)
bipolar_val = 2.0 * (x_clipped - self.x_min) / (self.x_max - self.x_min) - 1.0
return generate_bipolar_bitstream(bipolar_val, self.length, rng=self._rng)
p = value_to_unipolar_prob(x, self.x_min, self.x_max, clip=True)
if self.mode == "sobol":
return generate_sobol_bitstream(p, self.length, seed=self.seed)
if self.mode == "chaotic":
return self._chaotic_rng.generate_bitstream(p, self.length)
return generate_bernoulli_bitstream(p, self.length, rng=self._rng)
def decode(self, bitstream: np.ndarray[Any, Any]) -> float:
if self.mode == "bipolar":
bipolar_val = bipolar_to_value(bitstream)
# Map [-1, 1] back to [x_min, x_max]
return float(self.x_min + (bipolar_val + 1.0) / 2.0 * (self.x_max - self.x_min))
p_hat = bitstream_to_probability(bitstream)
return unipolar_prob_to_value(p_hat, self.x_min, self.x_max)
|
BitstreamEncoder in chaotic mode
BitstreamEncoder(mode="chaotic") uses sc_neurocore.chaos.rng.ChaoticRNG
for encode().
- Inputs are first mapped into unipolar probability.
- Output is produced with the same bitstream interface as Bernoulli/Sobol.
- The encode/decode API and output semantics remain unchanged.
- Equal seeds initialise the same logistic-map state; different seeds initialise
different states.
For chaotic-rng test coverage and implementation details:
- src/sc_neurocore/utils/bitstreams.py
- src/sc_neurocore/chaos/rng.py
- tests/test_chaotic_encoder.py
- docs/api/chaos.md
sc_neurocore.utils.bitstreams.BitstreamAverager
dataclass
Sliding-window probability estimator for bitstreams.
Example
avg = BitstreamAverager(window=100)
for _ in range(100):
... avg.push(1)
avg.estimate()
1.0
avg.push(0)
avg.estimate() < 1.0
True
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414 | @dataclass
class BitstreamAverager:
"""
Sliding-window probability estimator for bitstreams.
Example
-------
>>> avg = BitstreamAverager(window=100)
>>> for _ in range(100):
... avg.push(1)
>>> avg.estimate()
1.0
>>> avg.push(0)
>>> avg.estimate() < 1.0
True
"""
window: int
_buffer: Optional[np.ndarray[Any, Any]] = None
_index: int = 0
_filled: bool = False
_running_sum: int = 0
def __post_init__(self) -> None:
self._buffer = np.zeros(self.window, dtype=np.uint8)
self._running_sum = 0
def push(self, bit: int) -> None:
if bit not in (0, 1):
raise SCEncodingError("Bit must be 0 or 1.")
assert self._buffer is not None
# Remove old bit from sum if buffer is wrapping around
old_bit = self._buffer[self._index]
self._buffer[self._index] = bit
if self._filled:
self._running_sum = self._running_sum - old_bit + bit
else:
self._running_sum += bit
self._index = (self._index + 1) % self.window
if self._index == 0:
self._filled = True
def estimate(self) -> float:
if not self._filled:
# Estimate over the filled portion only
count = self._index
if count == 0:
return 0.0
return float(self._running_sum) / count
return float(self._running_sum) / self.window
def reset(self) -> None:
self._buffer.fill(0) # type: ignore
self._index = 0
self._filled = False
self._running_sum = 0
|
Bipolar Encoding
sc_neurocore.utils.bitstreams.generate_bipolar_bitstream(x, length, rng=None)
Generate a bipolar SC bitstream encoding a value in [-1, +1].
Bipolar encoding: value x in [-1, 1] maps to probability p = (x + 1) / 2.
Bit=1 with probability p, bit=0 with probability 1-p.
Decoding: x = 2 * mean(bits) - 1.
Bipolar multiplication uses XNOR: P(A XNOR B) encodes A*B in bipolar.
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 | def generate_bipolar_bitstream(
x: float,
length: int,
rng: Optional[RNG] = None,
) -> np.ndarray[Any, Any]:
"""Generate a bipolar SC bitstream encoding a value in [-1, +1].
Bipolar encoding: value x in [-1, 1] maps to probability p = (x + 1) / 2.
Bit=1 with probability p, bit=0 with probability 1-p.
Decoding: x = 2 * mean(bits) - 1.
Bipolar multiplication uses XNOR: P(A XNOR B) encodes A*B in bipolar.
"""
if not -1.0 <= x <= 1.0:
raise SCEncodingError(f"Bipolar value must be in [-1,1], got {x}.")
p = (x + 1.0) / 2.0
return generate_bernoulli_bitstream(p, length, rng)
|
sc_neurocore.utils.bitstreams.bipolar_to_value(bitstream)
Decode a bipolar bitstream to a value in [-1, +1].
x = 2 * mean(bits) - 1
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
133
134
135
136
137
138
139
140 | def bipolar_to_value(bitstream: np.ndarray[Any, Any]) -> float:
"""Decode a bipolar bitstream to a value in [-1, +1].
x = 2 * mean(bits) - 1
"""
if bitstream.size == 0:
raise SCEncodingError("Bitstream is empty.")
return float(2.0 * bitstream.mean() - 1.0)
|
SC Division (CORDIV, Li et al. 2014)
sc_neurocore.utils.bitstreams.sc_divide(numerator, denominator)
Stochastic computing division via CORDIV circuit.
Li, Qian, Riedel & Bazargan, IEEE Trans. Signal Process. 62(9), 2014.
at each bit position t,
- x[t]=1 → z[t] = 1
- x[t]=0, y[t]=1 → z[t] = 0
- x[t]=0, y[t]=0 → z[t] = z[t-1] (hold)
Converges to P(z=1) ≈ P(x=1) / P(y=1) when P(x) ≤ P(y).
Parameters
numerator : np.ndarray
Bitstream (uint8, {0,1}) of length L.
denominator : np.ndarray
Bitstream (uint8, {0,1}) of length L. Must have higher or equal density.
Returns
np.ndarray
Quotient bitstream of length L.
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296 | def sc_divide(
numerator: np.ndarray[Any, Any],
denominator: np.ndarray[Any, Any],
) -> np.ndarray[Any, Any]:
"""Stochastic computing division via CORDIV circuit.
Li, Qian, Riedel & Bazargan, IEEE Trans. Signal Process. 62(9), 2014.
Sequential circuit: at each bit position t,
- x[t]=1 → z[t] = 1
- x[t]=0, y[t]=1 → z[t] = 0
- x[t]=0, y[t]=0 → z[t] = z[t-1] (hold)
Converges to P(z=1) ≈ P(x=1) / P(y=1) when P(x) ≤ P(y).
Parameters
----------
numerator : np.ndarray
Bitstream (uint8, {0,1}) of length L.
denominator : np.ndarray
Bitstream (uint8, {0,1}) of length L. Must have higher or equal density.
Returns
-------
np.ndarray
Quotient bitstream of length L.
"""
numerator = np.asarray(numerator, dtype=np.uint8)
denominator = np.asarray(denominator, dtype=np.uint8)
if numerator.shape != denominator.shape:
raise ValueError("numerator and denominator must have the same shape")
out = np.zeros_like(numerator)
prev = 0
for t in range(len(numerator)):
if numerator[t] == 1:
out[t] = 1
elif denominator[t] == 1:
out[t] = 0
else:
out[t] = prev
prev = out[t]
return out
|
sc_divide contract
- Inputs must have the same shape (
numerator.shape == denominator.shape).
- Output is a binary stream (
uint8 values in {0, 1}) with the same shape.
- Per-bit rule:
x[t] = 1 -> z[t] = 1
x[t] = 0 and y[t] = 1 -> z[t] = 0
x[t] = 0 and y[t] = 0 -> z[t] = z[t-1] (hold)
See the hardware state-machine contract for the implementation-backed version:
docs/hardware/sc_cordiv.md.
CORDIV estimates P(numerator=1) / P(denominator=1) when the numerator
probability is not larger than the denominator probability. It is a sequential
stochastic circuit, so correlated streams and short streams can bias the
estimate; use this function as a bitstream contract, not as a floating-point
division replacement.
Adaptive Bitstream Length
Compute minimum bitstream length for target precision via Hoeffding,
Chebyshev, or variance bounds.
sc_neurocore.utils.bitstreams.adaptive_length(p, epsilon=0.01, confidence=0.95, method='hoeffding', min_length=64, max_length=65536)
Compute minimum bitstream length for target precision.
Given probability p and error tolerance epsilon, returns the smallest L
such that |p_hat - p| < epsilon with the given confidence.
Parameters
p : float
Encoded probability in [0, 1].
epsilon : float
Maximum acceptable absolute error.
confidence : float
Confidence level (e.g. 0.95 for 95%).
method : str
Bound type: "hoeffding" (tighter), "chebyshev", or "variance" (no confidence).
min_length : int
Minimum returned length.
max_length : int
Maximum returned length (hardware cap).
Returns
int
Minimum bitstream length (rounded up to nearest power of 2 for Sobol compatibility).
Source code in src/sc_neurocore/utils/bitstreams.py
| Python |
|---|
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 | def adaptive_length(
p: float,
epsilon: float = 0.01,
confidence: float = 0.95,
method: str = "hoeffding",
min_length: int = 64,
max_length: int = 65536,
) -> int:
"""Compute minimum bitstream length for target precision.
Given probability p and error tolerance epsilon, returns the smallest L
such that |p_hat - p| < epsilon with the given confidence.
Parameters
----------
p : float
Encoded probability in [0, 1].
epsilon : float
Maximum acceptable absolute error.
confidence : float
Confidence level (e.g. 0.95 for 95%).
method : str
Bound type: "hoeffding" (tighter), "chebyshev", or "variance" (no confidence).
min_length : int
Minimum returned length.
max_length : int
Maximum returned length (hardware cap).
Returns
-------
int
Minimum bitstream length (rounded up to nearest power of 2 for Sobol compatibility).
"""
if epsilon <= 0:
raise ValueError(f"epsilon must be positive, got {epsilon}")
if method == "variance":
# Var(p_hat) = p(1-p)/L < epsilon^2 → L > p(1-p)/epsilon^2
var_factor = p * (1.0 - p)
L = var_factor / (epsilon**2)
elif method == "chebyshev":
# P(|p_hat - p| >= epsilon) <= Var/epsilon^2 <= (1-confidence)
# L >= p(1-p) / (epsilon^2 * (1-confidence))
delta = 1.0 - confidence
if delta <= 0:
raise ValueError("confidence must be < 1.0")
L = p * (1.0 - p) / (epsilon**2 * delta)
elif method == "hoeffding":
# P(|p_hat - p| >= epsilon) <= 2*exp(-2*L*epsilon^2) <= (1-confidence)
# L >= -ln((1-confidence)/2) / (2*epsilon^2)
delta = 1.0 - confidence
if delta <= 0:
raise ValueError("confidence must be < 1.0")
import math
L = -math.log(delta / 2.0) / (2.0 * epsilon**2)
else:
raise ValueError(f"Unknown method: {method}. Use 'hoeffding', 'chebyshev', or 'variance'.")
L_int = max(min_length, int(np.ceil(L)))
# Round up to next power of 2 for Sobol compatibility
L_pow2 = 1
while L_pow2 < L_int:
L_pow2 *= 2
return min(L_pow2, max_length)
|
adaptive_length contract
- Supported methods:
hoeffding, chebyshev, variance.
- Output is at least
min_length and at most max_length.
- Returned length is rounded up to a power of two (Sobol compatibility).
- Invalid parameters (
epsilon <= 0, unknown methods, confidence >= 1) raise
ValueError.
The method controls the precision/speed tradeoff:
| Method |
Use when |
hoeffding |
Distribution-free confidence bound for Bernoulli streams. |
chebyshev |
Variance-aware confidence bound where p matters. |
variance |
Quick sizing from p(1-p)/epsilon^2 without confidence input. |
Validation:
- tests/test_adaptive_length.py
- tests/test_cordiv_division.py (empirical check of division with generated stream lengths)
LDS Decorrelation (Sobol/Halton)
Multi-dimensional low-discrepancy sequences for per-synapse decorrelation.
Generate decorrelated bitstreams for a probability matrix.
Each element of the probability matrix gets its own LDS dimension,
ensuring zero correlation between any pair of bitstreams.
probabilities : np.ndarray[Any, Any]
Probability matrix, any shape. Values in [0, 1].
length : int
Bitstream length per element.
method : str
"sobol" or "halton".
seed : int or None
Random seed for scrambling.
np.ndarray[Any, Any]
Shape (*probabilities.shape, length), dtype uint8.
Source code in src/sc_neurocore/utils/lds_decorrelation.py
| Python |
|---|
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | def generate_decorrelated_bitstreams(
probabilities: np.ndarray[Any, Any],
length: int = 1024,
method: str = "sobol",
seed: int | None = None,
) -> np.ndarray[Any, Any]:
"""Generate decorrelated bitstreams for a probability matrix.
Each element of the probability matrix gets its own LDS dimension,
ensuring zero correlation between any pair of bitstreams.
Parameters
----------
probabilities : np.ndarray[Any, Any]
Probability matrix, any shape. Values in [0, 1].
length : int
Bitstream length per element.
method : str
"sobol" or "halton".
seed : int or None
Random seed for scrambling.
Returns
-------
np.ndarray[Any, Any]
Shape (*probabilities.shape, length), dtype uint8.
"""
probs = np.asarray(probabilities, dtype=np.float64)
flat_probs = probs.flatten()
n_dims = len(flat_probs)
if n_dims == 0:
return np.zeros((*probs.shape, length), dtype=np.uint8)
if method == "sobol":
sampler = qmc.Sobol(d=n_dims, seed=seed)
samples = sampler.random(n=length) # (length, n_dims)
elif method == "halton":
sampler = qmc.Halton(d=n_dims, seed=seed)
samples = sampler.random(n=length) # (length, n_dims)
else:
raise ValueError(f"Unknown method: {method}. Use 'sobol' or 'halton'.")
# Threshold each dimension against its probability
bits = np.zeros((n_dims, length), dtype=np.uint8)
for d in range(n_dims):
p = float(np.clip(flat_probs[d], 0.0, 1.0))
bits[d] = (samples[:, d] < p).astype(np.uint8)
return bits.reshape(*probs.shape, length)
|
sc_neurocore.utils.lds_decorrelation.star_discrepancy_estimate(samples, n_test=10000)
Estimate star discrepancy of a sample set (quality metric for LDS).
Lower discrepancy → more uniform coverage → better SC precision.
Parameters
samples : np.ndarray[Any, Any]
Shape (n_samples, d), values in [0, 1].
n_test : int
Number of random test points.
Returns
float
Estimated star discrepancy.
Source code in src/sc_neurocore/utils/lds_decorrelation.py
| Python |
|---|
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 | def star_discrepancy_estimate(
samples: np.ndarray[Any, Any],
n_test: int = 10000,
) -> float:
"""Estimate star discrepancy of a sample set (quality metric for LDS).
Lower discrepancy → more uniform coverage → better SC precision.
Parameters
----------
samples : np.ndarray[Any, Any]
Shape (n_samples, d), values in [0, 1].
n_test : int
Number of random test points.
Returns
-------
float
Estimated star discrepancy.
"""
n, d = samples.shape
rng = np.random.RandomState(42)
test_points = rng.uniform(0, 1, (n_test, d))
max_disc = 0.0
for pt in test_points:
# Fraction of samples in [0, pt] hypercube
inside = np.all(samples <= pt, axis=1)
empirical = np.mean(inside)
# Volume of [0, pt] hypercube
volume = np.prod(pt)
disc = abs(empirical - volume)
if disc > max_disc:
max_disc = disc
return float(max_disc)
|
Random Number Generation
sc_neurocore.utils.rng.RNG
Thin wrapper around NumPy RNG for reproducible per-neuron streams.
Example
rng = RNG(seed=42)
vals = rng.random(5)
vals.shape
(5,)
RNG(seed=42).random(5) == vals # deterministic
array([ True, True, True, True, True])
Source code in src/sc_neurocore/utils/rng.py
| Python |
|---|
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 | class RNG:
"""
Thin wrapper around NumPy RNG for reproducible per-neuron streams.
Example
-------
>>> rng = RNG(seed=42)
>>> vals = rng.random(5)
>>> vals.shape
(5,)
>>> RNG(seed=42).random(5) == vals # deterministic
array([ True, True, True, True, True])
"""
def __init__(self, seed: Optional[int] = None) -> None:
self._rng = np.random.default_rng(seed)
def normal(
self, mean: float = 0.0, std: float = 1.0, size: int | tuple[int, ...] | None = None
) -> Any:
return self._rng.normal(mean, std, size)
def uniform(
self, low: float = 0.0, high: float = 1.0, size: int | tuple[int, ...] | None = None
) -> Any:
return self._rng.uniform(low, high, size)
def bernoulli(self, p: float, size: int | tuple[int, ...] | None = None) -> Any:
return self._rng.random(size) < p
def random(self, size: int | tuple[int, ...] | None = None) -> Any:
return self._rng.random(size)
def shuffle(self, x: np.ndarray[Any, Any]) -> None:
self._rng.shuffle(x)
|
Adaptive Utilities
sc_neurocore.utils.adaptive
AdaptiveInference
dataclass
Manages Progressive Precision / Early Exit for SC.
Source code in src/sc_neurocore/utils/adaptive.py
| Python |
|---|
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 | @dataclass
class AdaptiveInference:
"""
Manages Progressive Precision / Early Exit for SC.
"""
check_interval: int = 64
tolerance: float = 0.05 # 5% stability
min_length: int = 128
max_length: int = 2048
def run_adaptive(self, step_func: Callable[[], float]) -> float:
"""
Runs the SC process step-by-step until convergence or max_length.
Args:
step_func: Function that executes one step and returns current estimate.
"""
history: List[float] = []
current_val = 0.0
for t in range(self.max_length):
current_val = step_func()
if t >= self.min_length and t % self.check_interval == 0:
# Check stability over last 3 checks
history.append(current_val)
if len(history) >= 3:
# If variance is low, exit
recent = history[-3:]
if (max(recent) - min(recent)) < self.tolerance:
return current_val
return current_val
|
run_adaptive(step_func)
Runs the SC process step-by-step until convergence or max_length.
Parameters:
| Name |
Type |
Description |
Default |
step_func
|
Callable[[], float]
|
Function that executes one step and returns current estimate.
|
required
|
Source code in src/sc_neurocore/utils/adaptive.py
| Python |
|---|
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 | def run_adaptive(self, step_func: Callable[[], float]) -> float:
"""
Runs the SC process step-by-step until convergence or max_length.
Args:
step_func: Function that executes one step and returns current estimate.
"""
history: List[float] = []
current_val = 0.0
for t in range(self.max_length):
current_val = step_func()
if t >= self.min_length and t % self.check_interval == 0:
# Check stability over last 3 checks
history.append(current_val)
if len(history) >= 3:
# If variance is low, exit
recent = history[-3:]
if (max(recent) - min(recent)) < self.tolerance:
return current_val
return current_val
|
Connectome Generation
sc_neurocore.utils.connectomes
ConnectomeGenerator
Generates biologically plausible connectivity matrices.
Source code in src/sc_neurocore/utils/connectomes.py
| Python |
|---|
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 | class ConnectomeGenerator:
"""
Generates biologically plausible connectivity matrices.
"""
@staticmethod
def generate_watts_strogatz(
n_neurons: int, k_neighbors: int, p_rewire: float
) -> np.ndarray[Any, Any]:
"""
Watts-Strogatz Small-World Model.
1. Start with a regular ring lattice (connect to k neighbors).
2. Randomly rewire edges with probability p.
Returns:
Adjacency Matrix (Binary)
"""
if k_neighbors >= n_neurons:
return np.ones((n_neurons, n_neurons)) - np.eye(n_neurons)
adj = np.zeros((n_neurons, n_neurons), dtype=int)
# 1. Create Ring Lattice
for i in range(n_neurons):
for j in range(1, k_neighbors // 2 + 1):
# Connect forward
target = (i + j) % n_neurons
adj[i, target] = 1
adj[target, i] = 1 # Undirected for now, or directed?
# Synapses are usually directed. Let's make it directed ring.
# 2. Rewire
for i in range(n_neurons):
for j in range(1, k_neighbors // 2 + 1):
target = (i + j) % n_neurons
if np.random.random() < p_rewire:
# Delete old edge
adj[i, target] = 0
# Find new target (avoid self and existing)
new_target = i
while new_target == i or adj[i, new_target] == 1:
new_target = np.random.randint(0, n_neurons)
adj[i, new_target] = 1
return adj
@staticmethod
def generate_scale_free(n_neurons: int) -> np.ndarray[Any, Any]:
"""
Barabasi-Albert Scale-Free Model (Preferential Attachment).
"""
# Start with 2 connected nodes
adj = np.zeros((n_neurons, n_neurons), dtype=int)
adj[0, 1] = 1
adj[1, 0] = 1
degrees = np.zeros(n_neurons)
degrees[0] = 1
degrees[1] = 1
active_nodes = 2
for i in range(2, n_neurons):
# Connect to m=1 or m=2 existing nodes based on degree
# Prob(connect to j) = deg(j) / sum(deg)
probs = degrees[:active_nodes] / np.sum(degrees[:active_nodes])
# Select target
target = np.random.choice(np.arange(active_nodes), p=probs)
adj[i, target] = 1
# Directed: i -> target
degrees[i] += 1
degrees[target] += 1
active_nodes += 1
return adj
|
generate_watts_strogatz(n_neurons, k_neighbors, p_rewire)
staticmethod
Watts-Strogatz Small-World Model.
- Start with a regular ring lattice (connect to k neighbors).
- Randomly rewire edges with probability p.
Returns:
| Type |
Description |
ndarray[Any, Any]
|
Adjacency Matrix (Binary)
|
Source code in src/sc_neurocore/utils/connectomes.py
| Python |
|---|
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | @staticmethod
def generate_watts_strogatz(
n_neurons: int, k_neighbors: int, p_rewire: float
) -> np.ndarray[Any, Any]:
"""
Watts-Strogatz Small-World Model.
1. Start with a regular ring lattice (connect to k neighbors).
2. Randomly rewire edges with probability p.
Returns:
Adjacency Matrix (Binary)
"""
if k_neighbors >= n_neurons:
return np.ones((n_neurons, n_neurons)) - np.eye(n_neurons)
adj = np.zeros((n_neurons, n_neurons), dtype=int)
# 1. Create Ring Lattice
for i in range(n_neurons):
for j in range(1, k_neighbors // 2 + 1):
# Connect forward
target = (i + j) % n_neurons
adj[i, target] = 1
adj[target, i] = 1 # Undirected for now, or directed?
# Synapses are usually directed. Let's make it directed ring.
# 2. Rewire
for i in range(n_neurons):
for j in range(1, k_neighbors // 2 + 1):
target = (i + j) % n_neurons
if np.random.random() < p_rewire:
# Delete old edge
adj[i, target] = 0
# Find new target (avoid self and existing)
new_target = i
while new_target == i or adj[i, new_target] == 1:
new_target = np.random.randint(0, n_neurons)
adj[i, new_target] = 1
return adj
|
generate_scale_free(n_neurons)
staticmethod
Barabasi-Albert Scale-Free Model (Preferential Attachment).
Source code in src/sc_neurocore/utils/connectomes.py
| Python |
|---|
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 | @staticmethod
def generate_scale_free(n_neurons: int) -> np.ndarray[Any, Any]:
"""
Barabasi-Albert Scale-Free Model (Preferential Attachment).
"""
# Start with 2 connected nodes
adj = np.zeros((n_neurons, n_neurons), dtype=int)
adj[0, 1] = 1
adj[1, 0] = 1
degrees = np.zeros(n_neurons)
degrees[0] = 1
degrees[1] = 1
active_nodes = 2
for i in range(2, n_neurons):
# Connect to m=1 or m=2 existing nodes based on degree
# Prob(connect to j) = deg(j) / sum(deg)
probs = degrees[:active_nodes] / np.sum(degrees[:active_nodes])
# Select target
target = np.random.choice(np.arange(active_nodes), p=probs)
adj[i, target] = 1
# Directed: i -> target
degrees[i] += 1
degrees[target] += 1
active_nodes += 1
return adj
|
Decorrelators
sc_neurocore.utils.decorrelators
Decorrelator
dataclass
Bases: ABC
Base class for bitstream decorrelators.
Source code in src/sc_neurocore/utils/decorrelators.py
| Python |
|---|
| @dataclass
class Decorrelator(ABC):
"""
Base class for bitstream decorrelators.
"""
@abstractmethod
def process(self, bitstream: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]: ...
|
ShufflingDecorrelator
dataclass
Bases: Decorrelator
Decorrelates a bitstream by randomly shuffling bits within a window.
This preserves the exact bit count (probability) but destroys temporal correlations.
Source code in src/sc_neurocore/utils/decorrelators.py
| Python |
|---|
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 | @dataclass
class ShufflingDecorrelator(Decorrelator):
"""
Decorrelates a bitstream by randomly shuffling bits within a window.
This preserves the exact bit count (probability) but destroys temporal correlations.
"""
window_size: int = 16
seed: Optional[int] = None
def __post_init__(self) -> None:
self._rng = RNG(self.seed)
def process(self, bitstream: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
# Reshape into windows
length = len(bitstream)
pad = (self.window_size - (length % self.window_size)) % self.window_size
if pad > 0:
padded = np.append(bitstream, np.zeros(pad, dtype=np.uint8))
else:
padded = bitstream.copy()
num_windows = len(padded) // self.window_size
reshaped = padded.reshape((num_windows, self.window_size))
# Shuffle each row
# Note: Ideally we want independent shuffles per row.
# fast way:
for i in range(num_windows):
self._rng.shuffle(reshaped[i])
return reshaped.flatten()[:length]
|
LFSRRegenDecorrelator
dataclass
Bases: Decorrelator
Regenerates a new bitstream with the same probability estimate
but using a different random source (LFSR-like or just new RNG).
Source code in src/sc_neurocore/utils/decorrelators.py
| Python |
|---|
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 | @dataclass
class LFSRRegenDecorrelator(Decorrelator):
"""
Regenerates a new bitstream with the same probability estimate
but using a different random source (LFSR-like or just new RNG).
"""
seed: Optional[int] = None
def __post_init__(self) -> None:
self._rng = RNG(self.seed)
def process(self, bitstream: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
p_est = bitstream.mean()
# Regenerate
regenerated: np.ndarray[Any, Any] = self._rng.bernoulli(p_est, size=len(bitstream)).astype(
np.uint8
)
return regenerated
|
Fault Injection
sc_neurocore.utils.fault_injection
FaultInjector
Simulates hardware faults in Stochastic Computing bitstreams.
Source code in src/sc_neurocore/utils/fault_injection.py
| Python |
|---|
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 | class FaultInjector:
"""
Simulates hardware faults in Stochastic Computing bitstreams.
"""
@staticmethod
def inject_bit_flips(
bitstream: np.ndarray[Any, Any], error_rate: float
) -> np.ndarray[Any, Any]:
"""
Randomly flips bits with probability 'error_rate'.
"""
if error_rate <= 0:
return bitstream
# Generate error mask (1 where error occurs)
# Using numpy for speed
mask = np.random.random(bitstream.shape) < error_rate
# XOR with mask flips the bits where mask is 1
# bitstream is uint8 {0,1}
# We need to ensure we don't go out of bounds (0/1)
# 0 ^ 1 = 1
# 1 ^ 1 = 0
# 0 ^ 0 = 0
# 1 ^ 0 = 1
corrupted = np.bitwise_xor(bitstream.astype(bool), mask)
flipped: np.ndarray[Any, Any] = corrupted.astype(np.uint8)
return flipped
@staticmethod
def inject_stuck_at(
bitstream: np.ndarray[Any, Any], fault_rate: float, value: int
) -> np.ndarray[Any, Any]:
"""
Simulates Stuck-At-0 or Stuck-At-1 faults.
"""
mask = np.random.random(bitstream.shape) < fault_rate
corrupted = bitstream.copy()
corrupted[mask] = value
return corrupted
|
inject_bit_flips(bitstream, error_rate)
staticmethod
Randomly flips bits with probability 'error_rate'.
Source code in src/sc_neurocore/utils/fault_injection.py
| Python |
|---|
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 | @staticmethod
def inject_bit_flips(
bitstream: np.ndarray[Any, Any], error_rate: float
) -> np.ndarray[Any, Any]:
"""
Randomly flips bits with probability 'error_rate'.
"""
if error_rate <= 0:
return bitstream
# Generate error mask (1 where error occurs)
# Using numpy for speed
mask = np.random.random(bitstream.shape) < error_rate
# XOR with mask flips the bits where mask is 1
# bitstream is uint8 {0,1}
# We need to ensure we don't go out of bounds (0/1)
# 0 ^ 1 = 1
# 1 ^ 1 = 0
# 0 ^ 0 = 0
# 1 ^ 0 = 1
corrupted = np.bitwise_xor(bitstream.astype(bool), mask)
flipped: np.ndarray[Any, Any] = corrupted.astype(np.uint8)
return flipped
|
inject_stuck_at(bitstream, fault_rate, value)
staticmethod
Simulates Stuck-At-0 or Stuck-At-1 faults.
Source code in src/sc_neurocore/utils/fault_injection.py
| Python |
|---|
47
48
49
50
51
52
53
54
55
56
57 | @staticmethod
def inject_stuck_at(
bitstream: np.ndarray[Any, Any], fault_rate: float, value: int
) -> np.ndarray[Any, Any]:
"""
Simulates Stuck-At-0 or Stuck-At-1 faults.
"""
mask = np.random.random(bitstream.shape) < fault_rate
corrupted = bitstream.copy()
corrupted[mask] = value
return corrupted
|
FSM Activations
sc_neurocore.utils.fsm_activations
FSMActivation
dataclass
Bases: ABC
Base class for FSM-based stochastic activation functions.
The FSM takes a bitstream input and transitions between states.
The output bit is determined by the current state (e.g., if state > N/2, out=1).
This implements saturating non-linearities like Tanh or Sigmoid efficiently.
Source code in src/sc_neurocore/utils/fsm_activations.py
| Python |
|---|
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 | @dataclass
class FSMActivation(ABC):
"""
Base class for FSM-based stochastic activation functions.
The FSM takes a bitstream input and transitions between states.
The output bit is determined by the current state (e.g., if state > N/2, out=1).
This implements saturating non-linearities like Tanh or Sigmoid efficiently.
"""
num_states: int
initial_state: int
def __post_init__(self) -> None:
self.state = self.initial_state
@abstractmethod
def step(self, bit: int) -> int: ...
def process(self, bitstream: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
output = np.zeros_like(bitstream)
for i, bit in enumerate(bitstream):
output[i] = self.step(bit)
return output
|
TanhFSM
dataclass
Bases: FSMActivation
Implements a Tanh-like function using a linear FSM.
States: 0 to N-1
Input 0: state -> max(0, state - 1)
Input 1: state -> min(N-1, state + 1)
Output: 1 if state >= N/2 else 0
Source code in src/sc_neurocore/utils/fsm_activations.py
| Python |
|---|
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 | @dataclass
class TanhFSM(FSMActivation):
"""
Implements a Tanh-like function using a linear FSM.
States: 0 to N-1
Input 0: state -> max(0, state - 1)
Input 1: state -> min(N-1, state + 1)
Output: 1 if state >= N/2 else 0
"""
def __init__(self, states: int = 16) -> None:
self.num_states = states
self.initial_state = states // 2
super().__post_init__()
def step(self, bit: int) -> int:
if bit == 1:
if self.state < self.num_states - 1:
self.state += 1
else:
if self.state > 0:
self.state -= 1
return 1 if self.state >= (self.num_states // 2) else 0
|
ReLKFSM
dataclass
Bases: FSMActivation
Implements a Rectified Linear (ReLU-like) behavior.
Can be complex in SC, often approximated or used with bipolar coding.
Here we implement a simple saturating counter.
Source code in src/sc_neurocore/utils/fsm_activations.py
| Python |
|---|
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 | @dataclass
class ReLKFSM(FSMActivation):
"""
Implements a Rectified Linear (ReLU-like) behavior.
Can be complex in SC, often approximated or used with bipolar coding.
Here we implement a simple saturating counter.
"""
def __init__(self, states: int = 16) -> None:
self.num_states = states
self.initial_state = 0 # Start at 0
super().__post_init__()
def step(self, bit: int) -> int:
if bit == 1:
if self.state < self.num_states - 1:
self.state += 1
else:
if self.state > 0:
self.state -= 1
# Probabilistic output based on state?
# Or threshold? ReLK usually implies simple pass-through if > 0.
# This implementation is a "Stochastic Integrator"
return 1 if self.state > 0 else 0
|
Model Bridge
sc_neurocore.utils.model_bridge
SCBridge
Bridge between standard DL frameworks (like PyTorch) and SC-NeuroCore.
Source code in src/sc_neurocore/utils/model_bridge.py
| Python |
|---|
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 | class SCBridge:
"""
Bridge between standard DL frameworks (like PyTorch) and SC-NeuroCore.
"""
@staticmethod
def load_from_state_dict(state_dict: Dict[str, Any], layer_mapping: Dict[str, Any]) -> None:
"""
Load weights from a state_dict (numpy or torch tensors) into SC layers.
Args:
state_dict: Dictionary mapping "layer_name.weight" to arrays.
layer_mapping: Dictionary mapping "layer_name" to SCLayer instances.
"""
logger.info("SCBridge: Loading model weights...")
for name, layer in layer_mapping.items():
# Look for weight key
weight_key = f"{name}.weight"
if weight_key in state_dict:
w = np.array(state_dict[weight_key])
logger.info(" Found weights for %s: shape %s", name, w.shape)
# Normalize for SC
w_norm = normalize_weights(w)
# Check dimensions
if hasattr(layer, "weights"):
if layer.weights.shape == w_norm.shape:
layer.weights = w_norm
# If vectorized, refresh
if hasattr(layer, "_refresh_packed_weights"):
layer._refresh_packed_weights()
# If learning layer, update synapse objects
if hasattr(layer, "synapses"):
# Update individual synapses
for i in range(w_norm.shape[0]):
for j in range(w_norm.shape[1]):
layer.synapses[i][j].update_weight(w_norm[i, j])
else:
logger.warning(
" Shape mismatch for %s. SC: %s, Dict: %s",
name,
layer.weights.shape,
w.shape,
)
else:
logger.warning(" Layer %s does not have 'weights' attribute.", name)
else:
logger.debug(" No weights found for %s", name)
@staticmethod
def export_to_numpy(layers: Dict[str, Any]) -> Dict[str, np.ndarray[Any, Any]]:
"""
Export SC weights back to numpy dictionary.
"""
state = {}
for name, layer in layers.items():
if hasattr(layer, "get_weights"):
state[f"{name}.weight"] = layer.get_weights()
elif hasattr(layer, "weights"):
state[f"{name}.weight"] = layer.weights
return state
|
load_from_state_dict(state_dict, layer_mapping)
staticmethod
Load weights from a state_dict (numpy or torch tensors) into SC layers.
Parameters:
| Name |
Type |
Description |
Default |
state_dict
|
Dict[str, Any]
|
Dictionary mapping "layer_name.weight" to arrays.
|
required
|
layer_mapping
|
Dict[str, Any]
|
Dictionary mapping "layer_name" to SCLayer instances.
|
required
|
Source code in src/sc_neurocore/utils/model_bridge.py
| Python |
|---|
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | @staticmethod
def load_from_state_dict(state_dict: Dict[str, Any], layer_mapping: Dict[str, Any]) -> None:
"""
Load weights from a state_dict (numpy or torch tensors) into SC layers.
Args:
state_dict: Dictionary mapping "layer_name.weight" to arrays.
layer_mapping: Dictionary mapping "layer_name" to SCLayer instances.
"""
logger.info("SCBridge: Loading model weights...")
for name, layer in layer_mapping.items():
# Look for weight key
weight_key = f"{name}.weight"
if weight_key in state_dict:
w = np.array(state_dict[weight_key])
logger.info(" Found weights for %s: shape %s", name, w.shape)
# Normalize for SC
w_norm = normalize_weights(w)
# Check dimensions
if hasattr(layer, "weights"):
if layer.weights.shape == w_norm.shape:
layer.weights = w_norm
# If vectorized, refresh
if hasattr(layer, "_refresh_packed_weights"):
layer._refresh_packed_weights()
# If learning layer, update synapse objects
if hasattr(layer, "synapses"):
# Update individual synapses
for i in range(w_norm.shape[0]):
for j in range(w_norm.shape[1]):
layer.synapses[i][j].update_weight(w_norm[i, j])
else:
logger.warning(
" Shape mismatch for %s. SC: %s, Dict: %s",
name,
layer.weights.shape,
w.shape,
)
else:
logger.warning(" Layer %s does not have 'weights' attribute.", name)
else:
logger.debug(" No weights found for %s", name)
|
export_to_numpy(layers)
staticmethod
Export SC weights back to numpy dictionary.
Source code in src/sc_neurocore/utils/model_bridge.py
| Python |
|---|
88
89
90
91
92
93
94
95
96
97
98
99 | @staticmethod
def export_to_numpy(layers: Dict[str, Any]) -> Dict[str, np.ndarray[Any, Any]]:
"""
Export SC weights back to numpy dictionary.
"""
state = {}
for name, layer in layers.items():
if hasattr(layer, "get_weights"):
state[f"{name}.weight"] = layer.get_weights()
elif hasattr(layer, "weights"):
state[f"{name}.weight"] = layer.weights
return state
|
normalize_weights(weights)
Normalizes weights to [0, 1] range for unipolar SC.
Source code in src/sc_neurocore/utils/model_bridge.py
| Python |
|---|
23
24
25
26
27
28
29
30
31
32
33 | def normalize_weights(weights: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
"""
Normalizes weights to [0, 1] range for unipolar SC.
"""
w_min = weights.min()
w_max = weights.max()
if w_max == w_min:
uniform: np.ndarray[Any, Any] = np.ones_like(weights) * 0.5
return uniform
normalised: np.ndarray[Any, Any] = (weights - w_min) / (w_max - w_min)
return normalised
|