Skip to content

Optimiser API

The sc_neurocore.optimizer package exposes two optimisation surfaces:

  • fit_to_target(...) for deterministic resource fitting against FPGA target budgets.
  • SurrogateSCOptimizer for ML-guided stochastic-computing compiler choices using generated analytical design points plus optional measured observations.

It also exposes strict evidence loaders so benchmark and synthesis reports can feed the surrogate without fabricating missing LUT, power, latency, or accuracy values.

Deterministic Resource Fitting

Use fit_to_target when you have layer dimensions and weights and want the legacy prune / quantise / bitstream-length loop to reduce estimated FPGA resource use:

Python
import numpy as np

from sc_neurocore.optimizer import fit_to_target

weights = [np.ones((32, 16), dtype=np.float32)]
result = fit_to_target(
    layer_sizes=[(32, 16)],
    weights=weights,
    target="ice40",
    initial_bitstream_length=256,
)

print(result.summary())

The result reports whether the design fits, final estimated LUT use, selected bitstream length, sparsity, and each optimisation step.

Surrogate-Guided SC Optimisation

Use SurrogateSCOptimizer when the compiler must choose per-layer stochastic computing settings under LUT, power, and latency pressure:

Python
from sc_neurocore.optimizer import SurrogateSCOptimizer, TargetHardwareProfile
from sc_neurocore.optimizer.sc_optimizer import HardwareBudget, LayerProfile

target = TargetHardwareProfile(
    name="ice40-low-power",
    budget=HardwareBudget(max_luts=7680, max_power_mw=250.0, max_latency_cycles=4096),
)
network = [
    LayerProfile(id="encoder", mac_count=256, is_critical_path=True),
    LayerProfile(id="decoder", mac_count=128),
]

report = SurrogateSCOptimizer(target).optimise(network)

The report contains selected bitstream length, decorrelator, precision, LFSR polynomial, estimated LUTs, power, latency, utility score, and any rejected layers.

Measured Evidence

Measured observations are optional but preferred. The loader accepts JSON payloads with observations, benchmark_observations, layers, runs, or results records and raises ObservationLoadError when required fields are missing. Numeric metrics must be finite, non-negative numbers; integer fields such as mac_count, bitstream_length, precision_bits, luts_used, and latency_cycles reject booleans and fractional values.

Python
from sc_neurocore.optimizer import load_observations

observations = load_observations("benchmarks/results/fpga_power_observations.json")

For raw Vivado or Quartus reports, use the package helper:

Python
from sc_neurocore.optimizer import build_payload_from_reports, write_payload

payload = build_payload_from_reports(
    design_path="build/network_design.json",
    utilisation_path="build/vivado_utilisation.rpt",
    power_path="build/vivado_power.rpt",
    timing_path="build/vivado_timing.rpt",
    accuracy_score=0.991,
    clock_mhz=100.0,
    inferences_per_run=1,
)
write_payload(payload, "build/synthesis_observations.json")

The same flow is available from the CLI:

Bash
sc-neurocore collect-synthesis \
  --design build/network_design.json \
  --utilisation build/vivado_utilisation.rpt \
  --power build/vivado_power.rpt \
  --timing build/vivado_timing.rpt \
  --accuracy-score 0.991 \
  --clock-mhz 100 \
  --inferences-per-run 1 \
  --out build/synthesis_observations.json

Energy fields are computed only when both clock_mhz and inferences_per_run are provided. Vendor reports remain the evidence source; the helper does not run synthesis or invent measurements.

sc_neurocore.optimizer

Automatically compress and tune SNNs for target hardware.

ObservationLoadError

Bases: ValueError

Raised when a benchmark/synthesis observation cannot be trusted.

Source code in src/sc_neurocore/optimizer/observation_loader.py
Python
28
29
class ObservationLoadError(ValueError):
    """Raised when a benchmark/synthesis observation cannot be trusted."""

SynthesisFeedbackResult dataclass

Result of one measured-evidence optimiser feedback pass.

Source code in src/sc_neurocore/optimizer/feedback_loop.py
Python
28
29
30
31
32
33
34
@dataclass(frozen=True)
class SynthesisFeedbackResult:
    """Result of one measured-evidence optimiser feedback pass."""

    evidence_payload: dict[str, Any]
    observations: tuple[BenchmarkObservation, ...]
    report: SurrogateOptimizerReport

OptimizationResult dataclass

Result of the resource optimization process.

Source code in src/sc_neurocore/optimizer/resource_optimizer.py
Python
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@dataclass
class OptimizationResult:
    """Result of the resource optimization process."""

    fits: bool
    target: str
    final_luts: int
    target_luts: int
    utilization_pct: float
    final_bitstream_length: int
    final_sparsity: float
    steps: list[OptimizationStep] = field(default_factory=list)
    optimized_weights: list[np.ndarray] = field(default_factory=list, repr=False)

    def summary(self) -> str:
        lines = [
            f"Resource Optimization: {self.target}",
            f"  Fits: {'YES' if self.fits else 'NO'}",
            f"  LUTs: {self.final_luts:,} / {self.target_luts:,} ({self.utilization_pct:.1f}%)",
            f"  Bitstream length: {self.final_bitstream_length}",
            f"  Sparsity: {self.final_sparsity:.1%}",
            f"  Steps taken: {len(self.steps)}",
        ]
        for s in self.steps:
            lines.append(f"    {s.action}: {s.luts_before:,} -> {s.luts_after:,} LUTs")
        return "\n".join(lines)

BenchmarkObservation dataclass

Measured or externally supplied design-point observation.

The optimiser treats these as higher-priority training points than its analytical generated points. Callers should only pass observations that come from real benchmark or synthesis outputs.

Source code in src/sc_neurocore/optimizer/surrogate_sc_optimizer.py
Python
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@dataclass(frozen=True)
class BenchmarkObservation:
    """Measured or externally supplied design-point observation.

    The optimiser treats these as higher-priority training points than its
    analytical generated points.  Callers should only pass observations that
    come from real benchmark or synthesis outputs.
    """

    mac_count: int
    bitstream_length: int
    decorrelator: str
    mode: str
    precision_bits: int
    lfsr_polynomial: str
    luts_used: int
    power_mw: float
    latency_cycles: int
    accuracy_score: float
    is_critical_path: bool = False

SurrogateOptimizerReport dataclass

Budgeted per-layer compiler configuration.

Source code in src/sc_neurocore/optimizer/surrogate_sc_optimizer.py
Python
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@dataclass(frozen=True)
class SurrogateOptimizerReport:
    """Budgeted per-layer compiler configuration."""

    config: dict[str, SurrogateLayerConfig]
    total_luts: int
    total_power_mw: float
    total_latency_cycles: int
    mean_accuracy: float
    training_points: int
    target_name: str
    rejected_layers: list[str] = field(default_factory=list)

    @property
    def feasible(self) -> bool:
        """Whether every layer received a configuration."""
        return not self.rejected_layers

feasible property

Whether every layer received a configuration.

SurrogateSCOptimizer

Compiler optimiser using a learned surrogate over SC design points.

Source code in src/sc_neurocore/optimizer/surrogate_sc_optimizer.py
Python
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
class SurrogateSCOptimizer:
    """Compiler optimiser using a learned surrogate over SC design points."""

    def __init__(
        self,
        target: TargetHardwareProfile,
        *,
        bitstream_options: Iterable[int] = (64, 128, 256, 512, 1024, 2048),
        precision_options: Iterable[int] = (4, 6, 8, 12, 16),
        lfsr_polynomials: Iterable[str] = _LFSR_POLYNOMIALS,
        observations: Iterable[BenchmarkObservation] = (),
    ) -> None:
        self.target = target
        self.bitstream_options = tuple(sorted({int(v) for v in bitstream_options}))
        self.precision_options = tuple(sorted({int(v) for v in precision_options}))
        self.lfsr_polynomials = tuple(lfsr_polynomials)
        self.observations = tuple(observations)
        self._base = SCOptimizer(target.budget)
        self._surrogate = _RidgeSurrogate()
        self._training_points = 0

    def optimise(self, network: list[LayerProfile]) -> SurrogateOptimizerReport | None:
        """Select budgeted layer settings for ``network``."""
        if not network:
            return SurrogateOptimizerReport(
                config={},
                total_luts=0,
                total_power_mw=0.0,
                total_latency_cycles=0,
                mean_accuracy=0.0,
                training_points=0,
                target_name=self.target.name,
            )

        self._fit_surrogate(network)
        selected: dict[str, SurrogateLayerConfig] = {}
        rejected: list[str] = []

        for layer in sorted(network, key=lambda item: item.is_critical_path, reverse=True):
            remaining_luts = self.target.budget.max_luts - sum(
                c.luts_used for c in selected.values()
            )
            remaining_power = self.target.budget.max_power_mw - sum(
                c.power_used for c in selected.values()
            )
            candidates = self._rank_layer_candidates(layer, remaining_luts, remaining_power)
            if not candidates:
                rejected.append(layer.id)
                continue
            selected[layer.id] = candidates[0]

        if rejected:
            return SurrogateOptimizerReport(
                config=selected,
                total_luts=sum(c.luts_used for c in selected.values()),
                total_power_mw=sum(c.power_used for c in selected.values()),
                total_latency_cycles=max((c.latency_cycles for c in selected.values()), default=0),
                mean_accuracy=self._weighted_accuracy(selected, network),
                training_points=self._training_points,
                target_name=self.target.name,
                rejected_layers=rejected,
            )

        return self._rebalance(selected, network)

    def _fit_surrogate(self, network: list[LayerProfile]) -> None:
        rows: list[np.ndarray] = []
        labels: list[np.ndarray] = []

        for layer in network:
            for cand in self._candidate_grid(layer):
                label = self._analytical_label(cand)
                rows.append(self._features(cand))
                labels.append(self._normalise_label(label))

        for obs in self.observations:
            cand = _Candidate(
                mac_count=obs.mac_count,
                is_critical_path=obs.is_critical_path,
                bitstream_length=obs.bitstream_length,
                decorrelator=obs.decorrelator,
                mode=obs.mode,
                precision_bits=obs.precision_bits,
                lfsr_polynomial=obs.lfsr_polynomial,
            )
            label = _Label(
                luts=float(obs.luts_used),
                power_mw=obs.power_mw,
                latency_cycles=float(obs.latency_cycles),
                accuracy=obs.accuracy_score,
            )
            rows.extend([self._features(cand)] * 4)
            labels.extend([self._normalise_label(label)] * 4)

        self._training_points = len(rows)
        self._surrogate.fit(np.vstack(rows), np.vstack(labels))

    def _rank_layer_candidates(
        self, layer: LayerProfile, remaining_luts: int, remaining_power: float
    ) -> list[SurrogateLayerConfig]:
        ranked: list[SurrogateLayerConfig] = []
        for cand in self._candidate_grid(layer):
            pred = self._denormalise_prediction(
                self._surrogate.predict(self._features(cand)[None, :])[0]
            )
            pred = self._observation_label(cand) or pred
            if pred.luts > remaining_luts or pred.power_mw > remaining_power:
                continue
            if self.target.budget.max_latency_cycles and (
                pred.latency_cycles > self.target.budget.max_latency_cycles
            ):
                continue
            ranked.append(self._to_config(cand, pred))
        ranked.sort(key=lambda cfg: cfg.utility_score, reverse=True)
        return ranked

    def _candidate_grid(self, layer: LayerProfile) -> Iterable[_Candidate]:
        for mode in _MODES:
            if mode == "Deterministic":
                yield _Candidate(
                    mac_count=layer.mac_count,
                    is_critical_path=layer.is_critical_path,
                    bitstream_length=1,
                    decorrelator="None",
                    mode=mode,
                    precision_bits=16,
                    lfsr_polynomial="none",
                )
                continue
            for length in self.bitstream_options:
                for precision in self.precision_options:
                    for decorrelator in _DECORRELATORS:
                        polys = self.lfsr_polynomials if decorrelator == "LFSR" else ("none",)
                        for polynomial in polys:
                            yield _Candidate(
                                mac_count=layer.mac_count,
                                is_critical_path=layer.is_critical_path,
                                bitstream_length=length,
                                decorrelator=decorrelator,
                                mode=mode,
                                precision_bits=precision,
                                lfsr_polynomial=polynomial,
                            )

    def _analytical_label(self, cand: _Candidate) -> _Label:
        luts, power, accuracy, latency = self._base._estimate_resources(
            cand.mac_count,
            cand.bitstream_length,
            cand.decorrelator,
            cand.mode,
        )
        precision_scale = cand.precision_bits / 16.0
        if cand.mode != "Deterministic":
            luts = int(luts * (0.70 + 0.30 * precision_scale))
            power = power * (0.60 + 0.40 * precision_scale)
            accuracy -= max(0.0, (8 - cand.precision_bits) * 0.012)
            accuracy += self._polynomial_quality(cand.lfsr_polynomial)
        return _Label(
            luts=max(1.0, float(luts)),
            power_mw=max(1e-9, float(power)),
            latency_cycles=max(1.0, float(latency)),
            accuracy=max(0.1, min(1.0, float(accuracy))),
        )

    def _observation_label(self, cand: _Candidate) -> _Label | None:
        for obs in self.observations:
            if (
                obs.mac_count == cand.mac_count
                and obs.is_critical_path == cand.is_critical_path
                and obs.bitstream_length == cand.bitstream_length
                and obs.decorrelator == cand.decorrelator
                and obs.mode == cand.mode
                and obs.precision_bits == cand.precision_bits
                and obs.lfsr_polynomial == cand.lfsr_polynomial
            ):
                return _Label(
                    luts=float(obs.luts_used),
                    power_mw=obs.power_mw,
                    latency_cycles=float(obs.latency_cycles),
                    accuracy=obs.accuracy_score,
                )
        return None

    def _to_config(self, cand: _Candidate, pred: _Label) -> SurrogateLayerConfig:
        return SurrogateLayerConfig(
            bitstream_length=cand.bitstream_length,
            decorrelator=cand.decorrelator,
            mode=cand.mode,
            precision_bits=cand.precision_bits,
            lfsr_polynomial=cand.lfsr_polynomial,
            luts_used=max(1, int(round(pred.luts))),
            power_used=max(0.0, pred.power_mw),
            latency_cycles=max(1, int(round(pred.latency_cycles))),
            accuracy_score=max(0.0, min(1.0, pred.accuracy)),
            utility_score=self._utility(pred),
        )

    def _rebalance(
        self, selected: dict[str, SurrogateLayerConfig], network: list[LayerProfile]
    ) -> SurrogateOptimizerReport:
        # Greedy second pass: upgrade the most useful affordable candidate,
        # especially on critical layers, until no candidate improves utility.
        improved = True
        while improved:
            improved = False
            best_layer = ""
            best_cfg: SurrogateLayerConfig | None = None
            best_gain = 0.0
            current_luts = sum(c.luts_used for c in selected.values())
            current_power = sum(c.power_used for c in selected.values())

            for layer in network:
                current = selected[layer.id]
                for cand in self._rank_layer_candidates(
                    layer,
                    self.target.budget.max_luts - current_luts + current.luts_used,
                    self.target.budget.max_power_mw - current_power + current.power_used,
                ):
                    if cand == current:
                        continue
                    gain = cand.utility_score - current.utility_score
                    if layer.is_critical_path:
                        gain *= 1.5
                    if gain > best_gain:
                        best_gain = gain
                        best_cfg = cand
                        best_layer = layer.id

            if best_cfg is not None and best_layer:
                selected[best_layer] = best_cfg
                improved = True

        return SurrogateOptimizerReport(
            config=selected,
            total_luts=sum(c.luts_used for c in selected.values()),
            total_power_mw=sum(c.power_used for c in selected.values()),
            total_latency_cycles=max((c.latency_cycles for c in selected.values()), default=0),
            mean_accuracy=self._weighted_accuracy(selected, network),
            training_points=self._training_points,
            target_name=self.target.name,
        )

    def _features(self, cand: _Candidate) -> np.ndarray:
        decor = [1.0 if cand.decorrelator == name else 0.0 for name in _DECORRELATORS]
        mode = [1.0 if cand.mode == name else 0.0 for name in _MODES]
        poly = (
            0.0
            if cand.lfsr_polynomial == "none"
            else self._polynomial_quality(cand.lfsr_polynomial)
        )
        return np.array(
            [
                1.0,
                math.log2(max(1, cand.mac_count)),
                math.log2(max(1, cand.bitstream_length)),
                cand.precision_bits / 16.0,
                1.0 if cand.is_critical_path else 0.0,
                poly,
                *decor,
                *mode,
            ],
            dtype=np.float64,
        )

    def _normalise_label(self, label: _Label) -> np.ndarray:
        return np.array(
            [
                label.luts / max(1, self.target.budget.max_luts),
                label.power_mw / max(1e-9, self.target.budget.max_power_mw),
                label.latency_cycles / max(1, self.target.budget.max_latency_cycles or 2048),
                label.accuracy,
            ],
            dtype=np.float64,
        )

    def _denormalise_prediction(self, values: np.ndarray) -> _Label:
        return _Label(
            luts=float(values[0] * max(1, self.target.budget.max_luts)),
            power_mw=float(values[1] * max(1e-9, self.target.budget.max_power_mw)),
            latency_cycles=float(values[2] * max(1, self.target.budget.max_latency_cycles or 2048)),
            accuracy=float(values[3]),
        )

    def _utility(self, label: _Label) -> float:
        lut_frac = label.luts / max(1, self.target.budget.max_luts)
        power_frac = label.power_mw / max(1e-9, self.target.budget.max_power_mw)
        latency_frac = label.latency_cycles / max(1, self.target.budget.max_latency_cycles or 2048)
        return (
            self.target.accuracy_weight * label.accuracy
            - self.target.lut_weight * lut_frac
            - self.target.power_weight * power_frac
            - self.target.latency_weight * latency_frac
        )

    @staticmethod
    def _polynomial_quality(polynomial: str) -> float:
        if polynomial == "x16+x14+x13+x11+1":
            return 0.006
        if polynomial == "x16+x15+x13+x4+1":
            return 0.003
        if polynomial == "x16+x12+x3+x1+1":
            return 0.001
        return 0.0

    @staticmethod
    def _weighted_accuracy(
        selected: dict[str, SurrogateLayerConfig], network: list[LayerProfile]
    ) -> float:
        total = 0.0
        weight = 0.0
        for layer in network:
            cfg = selected.get(layer.id)
            if cfg is None:
                continue
            w = 2.0 if layer.is_critical_path else 1.0
            total += cfg.accuracy_score * w
            weight += w
        return total / weight if weight else 0.0

optimise(network)

Select budgeted layer settings for network.

Source code in src/sc_neurocore/optimizer/surrogate_sc_optimizer.py
Python
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def optimise(self, network: list[LayerProfile]) -> SurrogateOptimizerReport | None:
    """Select budgeted layer settings for ``network``."""
    if not network:
        return SurrogateOptimizerReport(
            config={},
            total_luts=0,
            total_power_mw=0.0,
            total_latency_cycles=0,
            mean_accuracy=0.0,
            training_points=0,
            target_name=self.target.name,
        )

    self._fit_surrogate(network)
    selected: dict[str, SurrogateLayerConfig] = {}
    rejected: list[str] = []

    for layer in sorted(network, key=lambda item: item.is_critical_path, reverse=True):
        remaining_luts = self.target.budget.max_luts - sum(
            c.luts_used for c in selected.values()
        )
        remaining_power = self.target.budget.max_power_mw - sum(
            c.power_used for c in selected.values()
        )
        candidates = self._rank_layer_candidates(layer, remaining_luts, remaining_power)
        if not candidates:
            rejected.append(layer.id)
            continue
        selected[layer.id] = candidates[0]

    if rejected:
        return SurrogateOptimizerReport(
            config=selected,
            total_luts=sum(c.luts_used for c in selected.values()),
            total_power_mw=sum(c.power_used for c in selected.values()),
            total_latency_cycles=max((c.latency_cycles for c in selected.values()), default=0),
            mean_accuracy=self._weighted_accuracy(selected, network),
            training_points=self._training_points,
            target_name=self.target.name,
            rejected_layers=rejected,
        )

    return self._rebalance(selected, network)

TargetHardwareProfile dataclass

Target device budget and compiler preference weights.

Source code in src/sc_neurocore/optimizer/surrogate_sc_optimizer.py
Python
42
43
44
45
46
47
48
49
50
51
@dataclass(frozen=True)
class TargetHardwareProfile:
    """Target device budget and compiler preference weights."""

    name: str
    budget: HardwareBudget
    lut_weight: float = 0.35
    power_weight: float = 0.35
    latency_weight: float = 0.20
    accuracy_weight: float = 1.0

load_observations(path)

Load benchmark observations from a JSON evidence file.

Source code in src/sc_neurocore/optimizer/observation_loader.py
Python
32
33
34
35
36
37
38
39
def load_observations(path: str | Path) -> list[BenchmarkObservation]:
    """Load benchmark observations from a JSON evidence file."""
    source = Path(path)
    try:
        payload = json.loads(source.read_text(encoding="utf-8"))
    except json.JSONDecodeError as exc:
        raise ObservationLoadError(f"{source} is not valid JSON: {exc}") from exc
    return observations_from_payload(payload, source=str(source))

load_synthesis_observation(report_paths, *, design, accuracy_score, latency_cycles=None)

Load one observation from Vivado/Quartus report files plus design metadata.

Raw vendor reports do not describe the compiler decision that produced the hardware, and many do not carry model accuracy. The caller must therefore provide the design fields and measured accuracy explicitly.

Source code in src/sc_neurocore/optimizer/observation_loader.py
Python
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def load_synthesis_observation(
    report_paths: Mapping[str, str | Path],
    *,
    design: Mapping[str, Any],
    accuracy_score: float,
    latency_cycles: int | None = None,
) -> BenchmarkObservation:
    """Load one observation from Vivado/Quartus report files plus design metadata.

    Raw vendor reports do not describe the compiler decision that produced the
    hardware, and many do not carry model accuracy.  The caller must therefore
    provide the design fields and measured accuracy explicitly.
    """
    reports: dict[str, str] = {}
    for name, path in report_paths.items():
        source = Path(path)
        reports[name] = source.read_text(encoding="utf-8", errors="ignore")
    return observation_from_synthesis_reports(
        reports,
        design=design,
        accuracy_score=accuracy_score,
        latency_cycles=latency_cycles,
        source=", ".join(str(path) for path in report_paths.values()),
    )

observation_from_synthesis_reports(reports, *, design, accuracy_score, latency_cycles=None, source='<synthesis-reports>')

Build one observation from raw Vivado/Quartus text reports.

Source code in src/sc_neurocore/optimizer/observation_loader.py
Python
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def observation_from_synthesis_reports(
    reports: Mapping[str, str],
    *,
    design: Mapping[str, Any],
    accuracy_score: float,
    latency_cycles: int | None = None,
    source: str = "<synthesis-reports>",
) -> BenchmarkObservation:
    """Build one observation from raw Vivado/Quartus text reports."""
    metrics = _metrics_from_synthesis_reports(reports, source=source)
    if latency_cycles is not None:
        metrics["latency_cycles"] = latency_cycles
    record = dict(design)
    record.update(metrics)
    record["accuracy_score"] = accuracy_score
    return _observation_from_record(record, source=source, index=0)

observations_from_payload(payload, *, source='<memory>')

Convert an in-memory benchmark/synthesis payload into observations.

Source code in src/sc_neurocore/optimizer/observation_loader.py
Python
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def observations_from_payload(
    payload: Any, *, source: str = "<memory>"
) -> list[BenchmarkObservation]:
    """Convert an in-memory benchmark/synthesis payload into observations."""
    defaults: dict[str, Any] = {}
    if isinstance(payload, dict):
        defaults = _mapping(payload.get("design_defaults") or payload.get("design") or {})

    records = _extract_records(payload)
    if not records:
        raise ObservationLoadError(f"{source} contains no observation records")

    observations = []
    for index, record in enumerate(records):
        merged = dict(defaults)
        merged.update(_mapping(record))
        observations.append(_observation_from_record(merged, source=source, index=index))
    return observations

optimise_from_evidence_payload(*, network, target, payload)

Rerun the SC optimiser from an in-memory evidence payload.

Source code in src/sc_neurocore/optimizer/feedback_loop.py
Python
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def optimise_from_evidence_payload(
    *,
    network: list[LayerProfile],
    target: TargetHardwareProfile,
    payload: dict[str, Any],
) -> SynthesisFeedbackResult:
    """Rerun the SC optimiser from an in-memory evidence payload."""
    observations = tuple(observations_from_payload(payload))
    report = SurrogateSCOptimizer(target, observations=observations).optimise(network)
    if report is None:
        raise RuntimeError("surrogate optimiser returned no report")
    return SynthesisFeedbackResult(
        evidence_payload=payload,
        observations=observations,
        report=report,
    )

optimise_from_synthesis_reports(*, network, target, design_path, utilisation_path, power_path, accuracy_score, timing_path=None, latency_cycles=None, clock_mhz=None, inferences_per_run=None)

Parse synthesis reports and immediately rerun the SC optimiser.

This helper is the local closed loop for the first production path: report files are parsed into strict evidence, evidence becomes measured observations, and those observations bias the surrogate optimiser for the supplied layer network. It never invokes vendor tools or fabricates missing metrics; callers must provide reports and measured accuracy.

Source code in src/sc_neurocore/optimizer/feedback_loop.py
Python
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def optimise_from_synthesis_reports(
    *,
    network: list[LayerProfile],
    target: TargetHardwareProfile,
    design_path: str | Path,
    utilisation_path: str | Path,
    power_path: str | Path,
    accuracy_score: float,
    timing_path: str | Path | None = None,
    latency_cycles: int | None = None,
    clock_mhz: float | None = None,
    inferences_per_run: int | None = None,
) -> SynthesisFeedbackResult:
    """Parse synthesis reports and immediately rerun the SC optimiser.

    This helper is the local closed loop for the first production path:
    report files are parsed into strict evidence, evidence becomes measured
    observations, and those observations bias the surrogate optimiser for the
    supplied layer network.  It never invokes vendor tools or fabricates
    missing metrics; callers must provide reports and measured accuracy.
    """
    payload = build_payload_from_reports(
        design_path=design_path,
        utilisation_path=utilisation_path,
        power_path=power_path,
        timing_path=timing_path,
        accuracy_score=accuracy_score,
        latency_cycles=latency_cycles,
        clock_mhz=clock_mhz,
        inferences_per_run=inferences_per_run,
    )
    return optimise_from_evidence_payload(network=network, target=target, payload=payload)

fit_to_target(layer_sizes, weights, target='ice40', max_iterations=10, min_bitstream_length=32, initial_bitstream_length=256)

Automatically compress an SNN to fit a target FPGA.

Iteratively applies: 1. Bitstream length reduction (halving L) 2. Weight pruning (increasing threshold) 3. Weight quantization (reducing bit width)

Stops when the energy estimator says the network fits on the target.

Parameters

layer_sizes : list of (n_inputs, n_neurons) weights : list of ndarray target : str FPGA target ('ice40', 'ecp5', 'artix7', 'zynq'). max_iterations : int Maximum optimization steps. min_bitstream_length : int Minimum allowed L. initial_bitstream_length : int Starting bitstream length.

Returns

OptimizationResult

Source code in src/sc_neurocore/optimizer/resource_optimizer.py
Python
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def fit_to_target(
    layer_sizes: list[tuple[int, int]],
    weights: list[np.ndarray],
    target: str = "ice40",
    max_iterations: int = 10,
    min_bitstream_length: int = 32,
    initial_bitstream_length: int = 256,
) -> OptimizationResult:
    """Automatically compress an SNN to fit a target FPGA.

    Iteratively applies:
    1. Bitstream length reduction (halving L)
    2. Weight pruning (increasing threshold)
    3. Weight quantization (reducing bit width)

    Stops when the energy estimator says the network fits on the target.

    Parameters
    ----------
    layer_sizes : list of (n_inputs, n_neurons)
    weights : list of ndarray
    target : str
        FPGA target ('ice40', 'ecp5', 'artix7', 'zynq').
    max_iterations : int
        Maximum optimization steps.
    min_bitstream_length : int
        Minimum allowed L.
    initial_bitstream_length : int
        Starting bitstream length.

    Returns
    -------
    OptimizationResult
    """
    from sc_neurocore.energy.fpga_models import TARGETS

    target_info = TARGETS.get(target)
    if target_info is None:
        raise ValueError(f"Unknown target '{target}'")

    current_weights = [w.copy() for w in weights]
    current_L = initial_bitstream_length
    steps = []
    prune_threshold = 0.001
    quant_bits = 16

    for iteration in range(max_iterations):
        report = estimate(layer_sizes, target=target, bitstream_length=current_L)

        if report.fits_on_target:
            break

        luts_before = report.total_luts

        # Strategy 1: Halve bitstream length
        if current_L > min_bitstream_length:
            current_L = max(current_L // 2, min_bitstream_length)
            report_after = estimate(layer_sizes, target=target, bitstream_length=current_L)
            steps.append(
                OptimizationStep(
                    action=f"Reduce L to {current_L}",
                    luts_before=luts_before,
                    luts_after=report_after.total_luts,
                    sparsity=0.0,
                    bitstream_length=current_L,
                )
            )
            if report_after.fits_on_target:  # pragma: no cover
                break
            continue

        # Strategy 2: Prune weights
        prune_threshold *= 3
        current_weights, prune_report = prune_weights(current_weights, threshold=prune_threshold)
        report_after = estimate(layer_sizes, target=target, bitstream_length=current_L)
        steps.append(
            OptimizationStep(
                action=f"Prune threshold={prune_threshold:.3f}",
                luts_before=luts_before,
                luts_after=report_after.total_luts,
                sparsity=prune_report.sparsity,
                bitstream_length=current_L,
            )
        )

        # Strategy 3: Reduce quantization
        if quant_bits > 4:
            quant_bits = max(quant_bits - 2, 4)
            current_weights = quantize_weights(current_weights, bits=quant_bits)
            steps.append(
                OptimizationStep(
                    action=f"Quantize to {quant_bits}-bit",
                    luts_before=report_after.total_luts,
                    luts_after=report_after.total_luts,
                    sparsity=prune_report.sparsity,
                    bitstream_length=current_L,
                )
            )

    # Final estimate
    final_report = estimate(layer_sizes, target=target, bitstream_length=current_L)

    total_params = sum(w.size for w in current_weights)
    nonzero = sum(np.count_nonzero(w) for w in current_weights)
    sparsity = 1.0 - nonzero / max(total_params, 1)

    return OptimizationResult(
        fits=final_report.fits_on_target,
        target=target,
        final_luts=final_report.total_luts,
        target_luts=target_info.total_luts,
        utilization_pct=final_report.utilization_pct,
        final_bitstream_length=current_L,
        final_sparsity=sparsity,
        steps=steps,
        optimized_weights=current_weights,
    )

build_payload_from_reports(*, design_path, utilisation_path, power_path, timing_path=None, accuracy_score, latency_cycles=None, clock_mhz=None, inferences_per_run=None)

Build an evidence payload from report files and explicit metadata.

Source code in src/sc_neurocore/optimizer/synthesis_evidence.py
Python
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def build_payload_from_reports(
    *,
    design_path: str | Path,
    utilisation_path: str | Path,
    power_path: str | Path,
    timing_path: str | Path | None = None,
    accuracy_score: float,
    latency_cycles: int | None = None,
    clock_mhz: float | None = None,
    inferences_per_run: int | None = None,
) -> dict[str, Any]:
    """Build an evidence payload from report files and explicit metadata."""
    reports: dict[str, Path] = {
        "utilisation": Path(utilisation_path),
        "power": Path(power_path),
    }
    if timing_path is not None:
        reports["timing"] = Path(timing_path)

    observation = load_synthesis_observation(
        reports,
        design=load_design(design_path),
        accuracy_score=accuracy_score,
        latency_cycles=latency_cycles,
    )
    payload: dict[str, Any] = {
        "source_reports": {name: str(path) for name, path in reports.items()},
        "observations": [observation_to_record(observation)],
    }
    energy = energy_payload(
        observation,
        clock_mhz=clock_mhz,
        inferences_per_run=inferences_per_run,
    )
    if energy is not None:
        payload["energy"] = energy
    return payload

write_payload(payload, output)

Write evidence JSON to a file or stdout.

Source code in src/sc_neurocore/optimizer/synthesis_evidence.py
Python
128
129
130
131
132
133
134
135
136
def write_payload(payload: dict[str, Any], output: str | Path | None) -> None:
    """Write evidence JSON to a file or stdout."""
    text = json.dumps(payload, indent=2, sort_keys=True) + "\n"
    if output is None:
        print(text, end="")
        return
    destination = Path(output)
    destination.parent.mkdir(parents=True, exist_ok=True)
    destination.write_text(text, encoding="utf-8")