Compiler API Reference
Network-to-hardware compilation pipeline. Arbitrary ODE strings to
synthesizable Verilog RTL in one function call.
CLI
sc-neurocore compile "dv/dt = -(v-E_L)/tau_m + I/C" \
--threshold "v > -50" --reset "v = -65" \
--params "E_L=-65,tau_m=10,C=1" --init "v=-65" \
--target ice40 --testbench --synthesize -o build/
| Flag |
Default |
Description |
--threshold |
None |
Spike condition (e.g. "v > -50") |
--reset |
None |
Reset expression (e.g. "v = -65; w = 0") |
--params |
None |
Comma-separated key=val pairs |
--init |
None |
Initial state key=val pairs |
--target |
ice40 |
FPGA target (ice40, ecp5, artix7, zynq) |
--module-name |
sc_equation_neuron |
Generated Verilog module name |
--testbench |
off |
Generate simulation testbench |
--synthesize |
off |
Run Yosys synthesis (requires Yosys in PATH) |
-o / --output |
build |
Output directory |
Equation → Verilog Compiler
Compile arbitrary ODE neuron equations to synthesizable Verilog RTL.
Supported functions
| Category |
Functions |
| Transcendental |
exp, log, sqrt, tanh, sigmoid, sin, cos |
| Arithmetic |
abs, clip(x, lo, hi), max(a, b), min(a, b) |
| Polynomial |
x**2 through x**8 |
| Operators |
+, -, *, / (by constant), unary - |
| Comparison |
>, >=, <, <= |
Transcendental functions use 16-entry piecewise Q8.8 lookup tables
covering [-8, +8). Accuracy: ~1-2% over the useful range for neuron
dynamics. All arithmetic includes saturating overflow protection.
flowchart TB
subgraph Input
A["ODE string<br/>'dv/dt = -(v-E_L)/tau + I/C'"]
end
subgraph Parse
B["Python AST parser"]
C["_VerilogExprEmitter"]
end
subgraph Emit
D["Q8.8 parameters"]
E["Multiply pipelines"]
F["LUT for exp/log/tanh"]
G["Saturating next-state"]
H["Threshold + reset logic"]
end
subgraph Output
I["Synthesizable Verilog"]
J["Testbench"]
end
A --> B --> C
C --> D & E & F & G & H
D & E & F & G & H --> I
I --> J
style Input fill:#e1f5fe
style Output fill:#e8f5e9
sc_neurocore.compiler.equation_compiler
Compile arbitrary ODE neuron equations to synthesizable Verilog.
Compile string equations directly to FPGA hardware:
from sc_neurocore.neurons.equation_builder import from_equations
from sc_neurocore.compiler.equation_compiler import compile_to_verilog
neuron = from_equations("dv/dt = -(v - E_L)/tau_m + I/C",
threshold="v > -50", reset="v = -65",
params=dict(E_L=-65, tau_m=10, C=1),
init=dict(v=-65))
verilog = compile_to_verilog(neuron, module_name="my_lif")
All arithmetic uses Q8.8 signed fixed-point. Each ODE term becomes
a multiply-shift pipeline stage. Threshold and reset map to
combinational comparators and mux logic.
Q88
dataclass
Q8.8 fixed-point conversion: 8 integer bits, 8 fractional bits, signed.
Source code in src/sc_neurocore/compiler/equation_compiler.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | @dataclass
class Q88:
"""Q8.8 fixed-point conversion: 8 integer bits, 8 fractional bits, signed."""
data_width: int = 16
fraction: int = 8
def encode(self, value: float) -> int:
raw = int(round(value * (1 << self.fraction)))
mask = (1 << self.data_width) - 1
return raw & mask
def encode_signed_literal(self, value: float) -> str:
raw = int(round(value * (1 << self.fraction)))
if raw < 0:
raw = raw & ((1 << self.data_width) - 1)
return f"{self.data_width}'sd{raw}"
|
compile_to_verilog(neuron, module_name='sc_equation_neuron', data_width=16, fraction=8)
Compile an EquationNeuron to synthesizable Verilog RTL.
Parameters
neuron : EquationNeuron
The neuron defined by arbitrary ODE strings.
module_name : str
Name of the generated Verilog module.
data_width : int
Bit width for fixed-point arithmetic (default 16 = Q8.8).
fraction : int
Number of fractional bits (default 8).
Returns
str
Synthesizable Verilog source code.
Source code in src/sc_neurocore/compiler/equation_compiler.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494 | def compile_to_verilog(
neuron: EquationNeuron,
module_name: str = "sc_equation_neuron",
data_width: int = 16,
fraction: int = 8,
) -> str:
"""Compile an EquationNeuron to synthesizable Verilog RTL.
Parameters
----------
neuron : EquationNeuron
The neuron defined by arbitrary ODE strings.
module_name : str
Name of the generated Verilog module.
data_width : int
Bit width for fixed-point arithmetic (default 16 = Q8.8).
fraction : int
Number of fractional bits (default 8).
Returns
-------
str
Synthesizable Verilog source code.
"""
q = Q88(data_width=data_width, fraction=fraction)
state_vars = set(neuron.equations.keys())
# Build parameter map: Python name → Verilog parameter name
param_map: dict[str, str] = {}
param_decls: list[str] = []
for pname, pval in {**neuron.parameters, **neuron.constants}.items():
vname = f"P_{pname.upper()}"
param_map[pname] = vname
q_val = q.encode(pval)
param_decls.append(
f" parameter signed [{data_width - 1}:0] {vname} = {data_width}'sd{q_val}"
)
# Generate derivative expressions
deriv_wires: list[str] = []
deriv_assigns: list[str] = []
all_intermediates: list[str] = []
for var, expr_str in neuron.equations.items():
vexpr, intermediates = _emit_expr(expr_str, state_vars, param_map, q)
all_intermediates.extend(intermediates)
# dv = expr * dt (multiply by dt in fixed-point)
dt_literal = q.encode_signed_literal(neuron.dt)
dt_tmp = f"_dt_mul_{var}"
all_intermediates.append(
f"wire signed [{2 * data_width - 1}:0] {dt_tmp} = ({vexpr}) * {dt_literal};"
)
deriv_name = f"d{var}"
deriv_wires.append(
f"wire signed [{data_width - 1}:0] {deriv_name} = ({dt_tmp} >>> {fraction})[{data_width - 1}:0];"
)
# Next-state computation with saturation
max_val = (1 << (data_width - 1)) - 1 # e.g. 32767 for 16-bit
min_val = -(1 << (data_width - 1)) # e.g. -32768 for 16-bit
next_wires: list[str] = []
for var in neuron.equations:
raw = f"{var}_raw"
next_wires.append(f"wire signed [{data_width}:0] {raw} = {var}_reg + d{var};")
next_wires.append(
f"wire signed [{data_width - 1}:0] {var}_next = "
f"({raw} > {data_width + 1}'sd{max_val}) ? {data_width}'sd{max_val} : "
f"({raw} < {data_width + 1}'sd{min_val}) ? {data_width}'sd{min_val} : "
f"{raw}[{data_width - 1}:0];"
)
# Threshold expression
threshold_verilog = ""
if neuron.threshold_expr:
threshold_verilog, thr_intermediates = _emit_expr(
neuron.threshold_expr, state_vars, param_map, q
)
all_intermediates.extend(thr_intermediates)
# Reset assignments
reset_assignments: list[str] = []
for var, expr_str in neuron.reset_rules.items():
rexpr, r_intermediates = _emit_expr(expr_str, state_vars, param_map, q)
all_intermediates.extend(r_intermediates)
reset_assignments.append(f" {var}_reg <= {rexpr};")
# Build the Verilog module
lines = [
"// Auto-generated by SC-NeuroCore equation compiler",
f"// Source: {neuron!r}",
f"// Fixed-point: Q{data_width - fraction}.{fraction} ({data_width}-bit signed)",
"`timescale 1ns / 1ps",
"",
f"module {module_name} #(",
]
lines.append(",\n".join(param_decls))
lines.append(")(")
lines.append(" input wire clk,")
lines.append(" input wire rst_n,")
lines.append(f" input wire signed [{data_width - 1}:0] I_t,")
lines.append(" output reg spike_out,")
# Output ports for each state variable
for var in neuron.equations:
lines.append(f" output reg signed [{data_width - 1}:0] {var}_out,")
# Remove trailing comma from last port
lines[-1] = lines[-1].rstrip(",")
lines.append(");")
lines.append("")
# State registers
for var in neuron.equations:
init_val = q.encode_signed_literal(neuron.initial_state.get(var, 0.0))
lines.append(f"reg signed [{data_width - 1}:0] {var}_reg;")
lines.append("")
# Intermediate wires (multiply pipelines)
for wire in all_intermediates:
lines.append(wire)
lines.append("")
# Derivative wires
for wire in deriv_wires:
lines.append(wire)
lines.append("")
# Next-state wires
for wire in next_wires:
lines.append(wire)
lines.append("")
# Sequential logic
lines.append("always @(posedge clk or negedge rst_n) begin")
lines.append(" if (!rst_n) begin")
for var in neuron.equations:
init_val = q.encode_signed_literal(neuron.initial_state.get(var, 0.0))
lines.append(f" {var}_reg <= {init_val};")
lines.append(f" {var}_out <= {init_val};")
lines.append(" spike_out <= 1'b0;")
lines.append(" end else begin")
if threshold_verilog:
lines.append(f" if ({threshold_verilog}) begin")
lines.append(" spike_out <= 1'b1;")
for assign in reset_assignments:
lines.append(assign)
# State vars not in reset keep their next value
for var in neuron.equations:
if var not in neuron.reset_rules:
lines.append(f" {var}_reg <= {var}_next;")
for var in neuron.equations:
reset_val = (
f"{param_map.get(var + '_reset_val', var + '_next')}"
if var in neuron.reset_rules
else f"{var}_next"
)
lines.append(f" {var}_out <= {var}_reg;")
lines.append(" end else begin")
lines.append(" spike_out <= 1'b0;")
for var in neuron.equations:
lines.append(f" {var}_reg <= {var}_next;")
lines.append(f" {var}_out <= {var}_next;")
lines.append(" end")
else:
lines.append(" spike_out <= 1'b0;")
for var in neuron.equations:
lines.append(f" {var}_reg <= {var}_next;")
lines.append(f" {var}_out <= {var}_next;")
lines.append(" end")
lines.append("end")
lines.append("")
lines.append("endmodule")
return "\n".join(lines)
|
equation_to_fpga(*equation_strings, threshold=None, reset=None, params=None, init=None, dt=0.1, module_name='sc_equation_neuron')
One-liner: ODE string → (Python neuron, Verilog RTL).
neuron, verilog = equation_to_fpga(
... "dv/dt = -(v - E_L)/tau_m + I/C",
... threshold="v > -50", reset="v = -65",
... params=dict(E_L=-65, tau_m=10, C=1),
... init=dict(v=-65),
... )
Source code in src/sc_neurocore/compiler/equation_compiler.py
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531 | def equation_to_fpga(
*equation_strings: str,
threshold: str | None = None,
reset: str | None = None,
params: dict[str, float] | None = None,
init: dict[str, float] | None = None,
dt: float = 0.1,
module_name: str = "sc_equation_neuron",
) -> tuple[EquationNeuron, str]:
"""One-liner: ODE string → (Python neuron, Verilog RTL).
>>> neuron, verilog = equation_to_fpga(
... "dv/dt = -(v - E_L)/tau_m + I/C",
... threshold="v > -50", reset="v = -65",
... params=dict(E_L=-65, tau_m=10, C=1),
... init=dict(v=-65),
... )
"""
from ..neurons.equation_builder import from_equations
# Split semicolons within single strings for convenience
expanded = []
for s in equation_strings:
expanded.extend(part.strip() for part in s.split(";") if part.strip())
neuron = from_equations(
*expanded,
threshold=threshold,
reset=reset,
params=params,
init=init,
dt=dt,
)
verilog = compile_to_verilog(neuron, module_name=module_name)
return neuron, verilog
|
generate_testbench(neuron, module_name='sc_equation_neuron', n_steps=200, input_current=1.0, data_width=16, fraction=8)
Generate a Verilog testbench for a compiled equation neuron.
Drives the module with constant current for n_steps clock cycles,
monitors spike_out and state outputs, and produces a VCD waveform.
Parameters
neuron : EquationNeuron
The neuron (same one passed to compile_to_verilog).
module_name : str
Must match the module name used in compile_to_verilog.
n_steps : int
Number of simulation clock cycles.
input_current : float
Constant input current (Q-encoded internally).
data_width : int
Bit width matching the compiled module.
fraction : int
Fractional bits matching the compiled module.
Returns
str
Verilog testbench source code.
Source code in src/sc_neurocore/compiler/equation_compiler.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634 | def generate_testbench(
neuron: EquationNeuron,
module_name: str = "sc_equation_neuron",
n_steps: int = 200,
input_current: float = 1.0,
data_width: int = 16,
fraction: int = 8,
) -> str:
"""Generate a Verilog testbench for a compiled equation neuron.
Drives the module with constant current for n_steps clock cycles,
monitors spike_out and state outputs, and produces a VCD waveform.
Parameters
----------
neuron : EquationNeuron
The neuron (same one passed to compile_to_verilog).
module_name : str
Must match the module name used in compile_to_verilog.
n_steps : int
Number of simulation clock cycles.
input_current : float
Constant input current (Q-encoded internally).
data_width : int
Bit width matching the compiled module.
fraction : int
Fractional bits matching the compiled module.
Returns
-------
str
Verilog testbench source code.
"""
q = Q88(data_width=data_width, fraction=fraction)
i_val = q.encode_signed_literal(input_current)
state_vars = list(neuron.equations.keys())
port_connections = [
" .clk(clk),",
" .rst_n(rst_n),",
f" .I_t({i_val}),",
" .spike_out(spike_out),",
]
wire_decls = []
for var in state_vars:
port_connections.append(f" .{var}_out({var}_out),")
wire_decls.append(f"wire signed [{data_width - 1}:0] {var}_out;")
port_connections[-1] = port_connections[-1].rstrip(",")
lines = [
f"// Auto-generated testbench for {module_name}",
"// SC-NeuroCore equation compiler",
"`timescale 1ns / 1ps",
"",
f"module tb_{module_name};",
"",
"reg clk;",
"reg rst_n;",
"wire spike_out;",
]
lines.extend(wire_decls)
lines.append("")
lines.append(f"{module_name} uut (")
lines.extend(port_connections)
lines.append(");")
lines.append("")
lines.append("// Clock: 10ns period (100 MHz)")
lines.append("initial clk = 0;")
lines.append("always #5 clk = ~clk;")
lines.append("")
lines.append("integer spike_count;")
lines.append("")
lines.append("initial begin")
lines.append(f' $dumpfile("tb_{module_name}.vcd");')
lines.append(f" $dumpvars(0, tb_{module_name});")
lines.append(" spike_count = 0;")
lines.append("")
lines.append(" // Reset")
lines.append(" rst_n = 0;")
lines.append(" #20;")
lines.append(" rst_n = 1;")
lines.append("")
lines.append(f" // Run {n_steps} cycles")
lines.append(f" repeat ({n_steps}) begin")
lines.append(" @(posedge clk);")
lines.append(" if (spike_out) spike_count = spike_count + 1;")
lines.append(" end")
lines.append("")
lines.append(
f' $display("Simulation complete: %0d spikes in {n_steps} cycles", spike_count);'
)
for var in state_vars:
lines.append(
f' $display("Final {var} = %0d (Q{data_width - fraction}.{fraction})", {var}_out);'
)
lines.append(" $finish;")
lines.append("end")
lines.append("")
lines.append("endmodule")
return "\n".join(lines)
|
Pipeline
Orchestration pipeline: MLIR → firtool → Verilog → Yosys → nextpnr → bitstream.
sc_neurocore.compiler.pipeline
Orchestration Pipeline for sc-neurocore's Hardware Compiler.
This module provides the automated workflow to take a stochastic graph from
the MLIREmitter and compile it down to a bitstream using open-source FPGA tools:
1. CIRCT (firtool) -> Verilog
2. Yosys -> Synthesis (BLIF/JSON)
3. NextPNR -> Place & Route
4. IcePack / Project Xray -> Bitstream
CompilerPipeline
Automated hardware synthesis pipeline.
Source code in src/sc_neurocore/compiler/pipeline.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 | class CompilerPipeline:
"""
Automated hardware synthesis pipeline.
"""
def __init__(self, work_dir: str = ".tmp/compiler"):
self.work_dir = os.path.realpath(work_dir)
if not os.path.exists(self.work_dir):
os.makedirs(self.work_dir)
@staticmethod
def _sanitize_name(name: str) -> str:
"""Restrict output_name to alphanumeric + underscore."""
sanitized = "".join(c for c in name if c.isalnum() or c == "_")
if not sanitized:
from sc_neurocore.exceptions import SCCompilerError
raise SCCompilerError(f"Invalid output name: {name!r}")
return sanitized
def compile_mlir_to_verilog(self, mlir_content: str, output_name: str = "top") -> str:
"""
Invokes 'firtool' to lower MLIR to Verilog.
"""
output_name = self._sanitize_name(output_name)
mlir_path = os.path.join(self.work_dir, f"{output_name}.mlir")
v_path = os.path.join(self.work_dir, f"{output_name}.v")
with open(mlir_path, "w") as f:
f.write(mlir_content)
logger.info(f"Lowering {mlir_path} to Verilog...")
# Note: In a real environment, firtool must be in PATH
try:
subprocess.run(["firtool", mlir_path, "-o", v_path], check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"firtool failed or not found: {e}. Falling back to stub Verilog.")
# Fallback for demo/development without full toolchain
with open(v_path, "w") as f:
f.write(
f"// Stub Verilog generated for {output_name}\nmodule {output_name}(); endmodule"
)
return v_path
_ALLOWED_TARGETS = {"ice40", "ecp5", "gowin", "xilinx"}
def _validate_path(self, path: str) -> str:
"""Ensure path resolves inside work_dir."""
real = os.path.realpath(path)
if not real.startswith(self.work_dir):
from sc_neurocore.exceptions import SCCompilerError
raise SCCompilerError(f"Path escapes work_dir: {path!r}")
return real
def run_synthesis(self, v_path: str, target_fpga: str = "ice40") -> str:
"""
Invokes 'yosys' for synthesis.
"""
v_path = self._validate_path(v_path)
if target_fpga not in self._ALLOWED_TARGETS:
from sc_neurocore.exceptions import SCCompilerError
raise SCCompilerError(f"Unknown target FPGA: {target_fpga!r}")
base = os.path.splitext(v_path)[0]
json_path = f"{base}.json"
logger.info(f"Synthesizing {v_path} for {target_fpga}...")
# Use yosys script file to avoid shell metacharacter injection via -p
script = f"read_verilog {v_path}; synth_{target_fpga} -json {json_path}"
script_path = f"{base}_synth.ys"
with open(script_path, "w") as f:
f.write(script)
try:
subprocess.run(["yosys", "-s", script_path], check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"yosys failed or not found: {e}")
return json_path
def run_pnr(self, json_path: str, target_device: str = "up5k") -> str:
"""
Invokes 'nextpnr' for place and route.
"""
json_path = self._validate_path(json_path)
asc_path = f"{os.path.splitext(json_path)[0]}.asc"
logger.info(f"Running P&R for {target_device}...")
pnr_cmd = ["nextpnr-ice40", "--up5k", "--json", json_path, "--asc", asc_path]
try:
subprocess.run(pnr_cmd, check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"nextpnr failed or not found: {e}")
return asc_path
|
compile_mlir_to_verilog(mlir_content, output_name='top')
Invokes 'firtool' to lower MLIR to Verilog.
Source code in src/sc_neurocore/compiler/pipeline.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 | def compile_mlir_to_verilog(self, mlir_content: str, output_name: str = "top") -> str:
"""
Invokes 'firtool' to lower MLIR to Verilog.
"""
output_name = self._sanitize_name(output_name)
mlir_path = os.path.join(self.work_dir, f"{output_name}.mlir")
v_path = os.path.join(self.work_dir, f"{output_name}.v")
with open(mlir_path, "w") as f:
f.write(mlir_content)
logger.info(f"Lowering {mlir_path} to Verilog...")
# Note: In a real environment, firtool must be in PATH
try:
subprocess.run(["firtool", mlir_path, "-o", v_path], check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"firtool failed or not found: {e}. Falling back to stub Verilog.")
# Fallback for demo/development without full toolchain
with open(v_path, "w") as f:
f.write(
f"// Stub Verilog generated for {output_name}\nmodule {output_name}(); endmodule"
)
return v_path
|
run_synthesis(v_path, target_fpga='ice40')
Invokes 'yosys' for synthesis.
Source code in src/sc_neurocore/compiler/pipeline.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 | def run_synthesis(self, v_path: str, target_fpga: str = "ice40") -> str:
"""
Invokes 'yosys' for synthesis.
"""
v_path = self._validate_path(v_path)
if target_fpga not in self._ALLOWED_TARGETS:
from sc_neurocore.exceptions import SCCompilerError
raise SCCompilerError(f"Unknown target FPGA: {target_fpga!r}")
base = os.path.splitext(v_path)[0]
json_path = f"{base}.json"
logger.info(f"Synthesizing {v_path} for {target_fpga}...")
# Use yosys script file to avoid shell metacharacter injection via -p
script = f"read_verilog {v_path}; synth_{target_fpga} -json {json_path}"
script_path = f"{base}_synth.ys"
with open(script_path, "w") as f:
f.write(script)
try:
subprocess.run(["yosys", "-s", script_path], check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"yosys failed or not found: {e}")
return json_path
|
run_pnr(json_path, target_device='up5k')
Invokes 'nextpnr' for place and route.
Source code in src/sc_neurocore/compiler/pipeline.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 | def run_pnr(self, json_path: str, target_device: str = "up5k") -> str:
"""
Invokes 'nextpnr' for place and route.
"""
json_path = self._validate_path(json_path)
asc_path = f"{os.path.splitext(json_path)[0]}.asc"
logger.info(f"Running P&R for {target_device}...")
pnr_cmd = ["nextpnr-ice40", "--up5k", "--json", json_path, "--asc", asc_path]
try:
subprocess.run(pnr_cmd, check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"nextpnr failed or not found: {e}")
return asc_path
|
MLIR Emitter
sc_neurocore.compiler.mlir_emitter
MLIR / CIRCT Emitter for Stochastic Computing pipelines.
This module provides the frontend to lower sc-neurocore's Python-based
Stochastic IR into MLIR hardware dialects (HW, Seq, Comb) via CIRCT.
This allows us to leverage LLVM's optimization passes directly for
FPGA bitstream synthesis, skipping string-based Verilog generation.
MLIREmitter
Translates sc-neurocore objects into MLIR text formatted for CIRCT.
Source code in src/sc_neurocore/compiler/mlir_emitter.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 | class MLIREmitter:
"""
Translates sc-neurocore objects into MLIR text formatted for CIRCT.
"""
def __init__(self, module_name: str = "sc_neurocore_top"):
self.module_name = module_name
self.nodes: List[MLIRNode] = []
self._wire_counter = 0
def get_wire(self) -> str:
self._wire_counter += 1
return f"%w{self._wire_counter}"
def emit_and(self, lhs: str, rhs: str) -> str:
"""Emits a comb.and operation for stochastic multiplication."""
out = self.get_wire()
self.nodes.append(MLIRNode("comb.and", [lhs, rhs], out, {}))
return out
def emit_lfsr(self, width: int, seed: int) -> str:
"""Emits an LFSR instantiation."""
out = self.get_wire()
self.nodes.append(
MLIRNode(
"hw.instance",
[],
out,
{
"sym_name": "lfsr",
"module": "sc_lfsr",
"parameters": {"WIDTH": width, "SEED": seed},
},
)
)
return out
def emit_xor(self, lhs: str, rhs: str) -> str:
"""Emits a comb.xor operation."""
out = self.get_wire()
self.nodes.append(MLIRNode("comb.xor", [lhs, rhs], out, {}))
return out
def emit_mux(self, cond: str, true_val: str, false_val: str) -> str:
"""Emits a comb.mux operation (used for SC scaled addition)."""
out = self.get_wire()
self.nodes.append(MLIRNode("comb.mux", [cond, true_val, false_val], out, {}))
return out
def generate(self) -> str:
"""Generates the final MLIR string for the module."""
lines = []
# Modern CIRCT / MLIR HW dialect syntax
lines.append(f"hw.module @{self.module_name}(in %clk: i1, in %rst: i1, out out: i1) {{")
for node in self.nodes:
ins = ", ".join(node.inputs)
if node.op_type == "comb.and":
lines.append(f" {node.output} = comb.and {ins} : i1")
elif node.op_type == "comb.xor":
lines.append(f" {node.output} = comb.xor {ins} : i1")
elif node.op_type == "comb.mux":
c, t, f = node.inputs
lines.append(f" {node.output} = comb.mux {c}, {t}, {f} : i1")
elif node.op_type == "hw.instance":
lines.append(
f' {node.output} = hw.instance "{node.attributes["sym_name"]}" @{node.attributes["module"]}() -> (i1)'
)
# Final output assignment (taking the last node's output as an example)
last_wire = self.nodes[-1].output if self.nodes else "0"
lines.append(f" hw.output {last_wire} : i1")
lines.append("}")
return "\n".join(lines)
|
emit_and(lhs, rhs)
Emits a comb.and operation for stochastic multiplication.
Source code in src/sc_neurocore/compiler/mlir_emitter.py
| def emit_and(self, lhs: str, rhs: str) -> str:
"""Emits a comb.and operation for stochastic multiplication."""
out = self.get_wire()
self.nodes.append(MLIRNode("comb.and", [lhs, rhs], out, {}))
return out
|
emit_lfsr(width, seed)
Emits an LFSR instantiation.
Source code in src/sc_neurocore/compiler/mlir_emitter.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 | def emit_lfsr(self, width: int, seed: int) -> str:
"""Emits an LFSR instantiation."""
out = self.get_wire()
self.nodes.append(
MLIRNode(
"hw.instance",
[],
out,
{
"sym_name": "lfsr",
"module": "sc_lfsr",
"parameters": {"WIDTH": width, "SEED": seed},
},
)
)
return out
|
emit_xor(lhs, rhs)
Emits a comb.xor operation.
Source code in src/sc_neurocore/compiler/mlir_emitter.py
| def emit_xor(self, lhs: str, rhs: str) -> str:
"""Emits a comb.xor operation."""
out = self.get_wire()
self.nodes.append(MLIRNode("comb.xor", [lhs, rhs], out, {}))
return out
|
emit_mux(cond, true_val, false_val)
Emits a comb.mux operation (used for SC scaled addition).
Source code in src/sc_neurocore/compiler/mlir_emitter.py
| def emit_mux(self, cond: str, true_val: str, false_val: str) -> str:
"""Emits a comb.mux operation (used for SC scaled addition)."""
out = self.get_wire()
self.nodes.append(MLIRNode("comb.mux", [cond, true_val, false_val], out, {}))
return out
|
generate()
Generates the final MLIR string for the module.
Source code in src/sc_neurocore/compiler/mlir_emitter.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 | def generate(self) -> str:
"""Generates the final MLIR string for the module."""
lines = []
# Modern CIRCT / MLIR HW dialect syntax
lines.append(f"hw.module @{self.module_name}(in %clk: i1, in %rst: i1, out out: i1) {{")
for node in self.nodes:
ins = ", ".join(node.inputs)
if node.op_type == "comb.and":
lines.append(f" {node.output} = comb.and {ins} : i1")
elif node.op_type == "comb.xor":
lines.append(f" {node.output} = comb.xor {ins} : i1")
elif node.op_type == "comb.mux":
c, t, f = node.inputs
lines.append(f" {node.output} = comb.mux {c}, {t}, {f} : i1")
elif node.op_type == "hw.instance":
lines.append(
f' {node.output} = hw.instance "{node.attributes["sym_name"]}" @{node.attributes["module"]}() -> (i1)'
)
# Final output assignment (taking the last node's output as an example)
last_wire = self.nodes[-1].output if self.nodes else "0"
lines.append(f" hw.output {last_wire} : i1")
lines.append("}")
return "\n".join(lines)
|
Weight Quantizer
Float → Q-format fixed-point with nearest/stochastic/floor rounding,
plus SC probability mapping.
sc_neurocore.compiler.quantizer
Quantize trained float weights to Q-format fixed-point for SC hardware.
from sc_neurocore.compiler.quantizer import quantize_weights
After training, quantize for FPGA deployment
q_weights = quantize_weights(float_weights, format="Q8.8")
sc_probs = q_weights_to_sc_probabilities(q_weights, format="Q8.8")
Fixed-point Q-format specification.
Source code in src/sc_neurocore/compiler/quantizer.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 | @dataclass
class QFormat:
"""Fixed-point Q-format specification."""
integer_bits: int
fraction_bits: int
@property
def total_bits(self) -> int:
return self.integer_bits + self.fraction_bits
@property
def scale(self) -> int:
return 1 << self.fraction_bits
@property
def min_val(self) -> float:
return -(1 << (self.total_bits - 1)) / self.scale
@property
def max_val(self) -> float:
return ((1 << (self.total_bits - 1)) - 1) / self.scale
@classmethod
def from_string(cls, fmt: str) -> QFormat:
"""Parse 'Q8.8', 'Q4.12', etc."""
fmt = fmt.strip().upper()
if not fmt.startswith("Q") or "." not in fmt:
raise ValueError(f"Expected format like 'Q8.8', got {fmt!r}")
parts = fmt[1:].split(".")
return cls(integer_bits=int(parts[0]), fraction_bits=int(parts[1]))
|
Parse 'Q8.8', 'Q4.12', etc.
Source code in src/sc_neurocore/compiler/quantizer.py
| @classmethod
def from_string(cls, fmt: str) -> QFormat:
"""Parse 'Q8.8', 'Q4.12', etc."""
fmt = fmt.strip().upper()
if not fmt.startswith("Q") or "." not in fmt:
raise ValueError(f"Expected format like 'Q8.8', got {fmt!r}")
parts = fmt[1:].split(".")
return cls(integer_bits=int(parts[0]), fraction_bits=int(parts[1]))
|
quantize_weights(weights, fmt='Q8.8', rounding='nearest', clip=True)
Quantize float weights to fixed-point integers.
Parameters
weights : np.ndarray
Float weight matrix (any shape).
fmt : str
Q-format string, e.g. "Q8.8" (8 integer + 8 fractional = 16-bit signed).
rounding : str
"nearest" (round half to even), "stochastic" (probabilistic rounding),
or "floor" (truncate toward negative infinity).
clip : bool
If True, clip values to the representable range before quantization.
Returns
np.ndarray
Integer array (same shape) in the Q-format representation.
To recover the float: result / (2^fraction_bits).
Source code in src/sc_neurocore/compiler/quantizer.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 | def quantize_weights(
weights: np.ndarray,
fmt: str = "Q8.8",
rounding: str = "nearest",
clip: bool = True,
) -> np.ndarray:
"""Quantize float weights to fixed-point integers.
Parameters
----------
weights : np.ndarray
Float weight matrix (any shape).
fmt : str
Q-format string, e.g. "Q8.8" (8 integer + 8 fractional = 16-bit signed).
rounding : str
"nearest" (round half to even), "stochastic" (probabilistic rounding),
or "floor" (truncate toward negative infinity).
clip : bool
If True, clip values to the representable range before quantization.
Returns
-------
np.ndarray
Integer array (same shape) in the Q-format representation.
To recover the float: result / (2^fraction_bits).
"""
q = QFormat.from_string(fmt)
w = np.asarray(weights, dtype=np.float64)
if clip:
w = np.clip(w, q.min_val, q.max_val)
scaled = w * q.scale
if rounding == "nearest":
quantized = np.rint(scaled).astype(np.int64)
elif rounding == "stochastic":
floor = np.floor(scaled)
prob = scaled - floor
quantized = (floor + (np.random.random(w.shape) < prob)).astype(np.int64)
elif rounding == "floor":
quantized = np.floor(scaled).astype(np.int64)
else:
raise ValueError(
f"Unknown rounding mode: {rounding!r}. Use 'nearest', 'stochastic', or 'floor'."
)
min_int = -(1 << (q.total_bits - 1))
max_int = (1 << (q.total_bits - 1)) - 1
return np.clip(quantized, min_int, max_int)
|
dequantize_weights(quantized, fmt='Q8.8')
Convert quantized integer weights back to float.
Source code in src/sc_neurocore/compiler/quantizer.py
| def dequantize_weights(quantized: np.ndarray, fmt: str = "Q8.8") -> np.ndarray:
"""Convert quantized integer weights back to float."""
q = QFormat.from_string(fmt)
return quantized.astype(np.float64) / q.scale
|
q_weights_to_sc_probabilities(quantized, fmt='Q8.8')
Convert quantized weights to SC probabilities in [0, 1].
Maps the Q-format range [min, max] linearly to [0, 1] for
unipolar SC encoding.
Source code in src/sc_neurocore/compiler/quantizer.py
115
116
117
118
119
120
121
122
123
124 | def q_weights_to_sc_probabilities(quantized: np.ndarray, fmt: str = "Q8.8") -> np.ndarray:
"""Convert quantized weights to SC probabilities in [0, 1].
Maps the Q-format range [min, max] linearly to [0, 1] for
unipolar SC encoding.
"""
q = QFormat.from_string(fmt)
min_int = -(1 << (q.total_bits - 1))
max_int = (1 << (q.total_bits - 1)) - 1
return (quantized.astype(np.float64) - min_int) / (max_int - min_int)
|
quantization_error(weights, fmt='Q8.8', rounding='nearest')
Compute quantization error statistics.
Returns
dict with keys: max_abs_error, mean_abs_error, rmse, snr_db
Source code in src/sc_neurocore/compiler/quantizer.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 | def quantization_error(weights: np.ndarray, fmt: str = "Q8.8", rounding: str = "nearest") -> dict:
"""Compute quantization error statistics.
Returns
-------
dict with keys: max_abs_error, mean_abs_error, rmse, snr_db
"""
quantized = quantize_weights(weights, fmt=fmt, rounding=rounding)
recovered = dequantize_weights(quantized, fmt=fmt)
error = weights - recovered
mae = float(np.mean(np.abs(error)))
rmse = float(np.sqrt(np.mean(error**2)))
signal_power = float(np.mean(weights**2))
snr = 10 * np.log10(signal_power / max(rmse**2, 1e-30))
return {
"max_abs_error": float(np.max(np.abs(error))),
"mean_abs_error": mae,
"rmse": rmse,
"snr_db": float(snr),
}
|
Adaptive Precision
sc_neurocore.compiler.adaptive_precision
Per-layer adaptive bitstream length for mixed-precision SC networks.
Different layers tolerate different amounts of SC quantization noise.
Shallow layers (close to input) can use short bitstreams (L=64) for speed,
while deep layers (close to output) need longer bitstreams (L=1024) for
precision. Uniform L wastes throughput on shallow layers.
This module:
1. Analyzes per-layer sensitivity to bitstream length via sweeps
2. Assigns optimal L_i per layer using Hoeffding bounds or empirical calibration
3. Outputs a precision map for the compiler to generate per-layer Verilog
with different bitstream lengths
Reference: Sim & Lee 2019 — "Adjustable Sequence Length for SC NNs"
LayerPrecision
dataclass
Bitstream length assignment for one layer.
Source code in src/sc_neurocore/compiler/adaptive_precision.py
33
34
35
36
37
38
39
40
41 | @dataclass
class LayerPrecision:
"""Bitstream length assignment for one layer."""
layer_index: int
name: str
bitstream_length: int
error_bound: float
sensitivity: float
|
analyze_sensitivity(layer_weights, lengths=None, n_trials=100, seed=42)
Measure per-layer sensitivity to bitstream length reduction.
For each layer, compute mean output error across trial inputs
when reducing bitstream length from max to min. Layers with high
sensitivity need longer bitstreams.
Parameters
layer_weights : list of ndarray
Weight matrices for each layer.
lengths : list of int
Bitstream lengths to sweep (default: [32, 64, 128, 256, 512, 1024]).
n_trials : int
Number of random input trials.
seed : int
Random seed.
Returns
list of float
Per-layer sensitivity scores (higher = needs longer bitstream).
Source code in src/sc_neurocore/compiler/adaptive_precision.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 | def analyze_sensitivity(
layer_weights: list[np.ndarray],
lengths: list[int] | None = None,
n_trials: int = 100,
seed: int = 42,
) -> list[float]:
"""Measure per-layer sensitivity to bitstream length reduction.
For each layer, compute mean output error across trial inputs
when reducing bitstream length from max to min. Layers with high
sensitivity need longer bitstreams.
Parameters
----------
layer_weights : list of ndarray
Weight matrices for each layer.
lengths : list of int
Bitstream lengths to sweep (default: [32, 64, 128, 256, 512, 1024]).
n_trials : int
Number of random input trials.
seed : int
Random seed.
Returns
-------
list of float
Per-layer sensitivity scores (higher = needs longer bitstream).
"""
if lengths is None:
lengths = [32, 64, 128, 256, 512, 1024]
rng = np.random.RandomState(seed)
sensitivities = []
for w in layer_weights:
n_in = w.shape[1] if w.ndim == 2 else w.shape[0]
errors = []
for _ in range(n_trials):
x = rng.random(n_in)
exact = x @ w.T if w.ndim == 2 else x * w
length_errors = []
for L in lengths:
# SC computation: encode as bitstream, AND-multiply, popcount
sc_results = []
for trial in range(5):
bits_x = (rng.random((L, n_in)) < x).astype(np.float64)
if w.ndim == 2:
n_out = w.shape[0]
bits_w = np.zeros((L, n_out, n_in))
for j in range(n_out):
w_prob = np.clip(w[j], 0, 1)
bits_w[:, j, :] = (rng.random((L, n_in)) < w_prob).astype(np.float64)
and_result = bits_x[:, np.newaxis, :] * bits_w
sc_out = and_result.sum(axis=(0, 2)) / L
else: # pragma: no cover — scalar weight path
w_prob = np.clip(w, 0, 1)
bits_w = (rng.random((L,)) < w_prob).astype(np.float64)
sc_out = (bits_x.mean(axis=0) * bits_w).mean()
sc_results.append(sc_out)
sc_mean = np.mean(sc_results, axis=0)
err = np.mean(np.abs(sc_mean - np.clip(exact, 0, None)))
length_errors.append(err)
# Sensitivity = how much error changes across length range
sensitivity = max(length_errors) - min(length_errors) if length_errors else 0.0
errors.append(sensitivity)
sensitivities.append(float(np.mean(errors)))
return sensitivities
|
assign_lengths(layer_weights, layer_names=None, total_budget=None, min_length=32, max_length=1024, target_error=0.01, method='hoeffding')
Assign per-layer bitstream lengths under a total budget.
Parameters
layer_weights : list of ndarray
Weight matrices for each layer.
layer_names : list of str, optional
Human-readable layer names.
total_budget : int, optional
Total bitstream cycles budget. If None, each layer gets its own
minimum length for target_error.
min_length, max_length : int
Bounds on per-layer bitstream length.
target_error : float
Target per-layer accuracy (probability tolerance).
method : str
'hoeffding' uses Hoeffding bound, 'sensitivity' uses empirical sweep.
Returns
list of LayerPrecision
Per-layer bitstream length assignments.
Source code in src/sc_neurocore/compiler/adaptive_precision.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204 | def assign_lengths(
layer_weights: list[np.ndarray],
layer_names: list[str] | None = None,
total_budget: int | None = None,
min_length: int = 32,
max_length: int = 1024,
target_error: float = 0.01,
method: str = "hoeffding",
) -> list[LayerPrecision]:
"""Assign per-layer bitstream lengths under a total budget.
Parameters
----------
layer_weights : list of ndarray
Weight matrices for each layer.
layer_names : list of str, optional
Human-readable layer names.
total_budget : int, optional
Total bitstream cycles budget. If None, each layer gets its own
minimum length for target_error.
min_length, max_length : int
Bounds on per-layer bitstream length.
target_error : float
Target per-layer accuracy (probability tolerance).
method : str
'hoeffding' uses Hoeffding bound, 'sensitivity' uses empirical sweep.
Returns
-------
list of LayerPrecision
Per-layer bitstream length assignments.
"""
n_layers = len(layer_weights)
if layer_names is None:
layer_names = [f"layer_{i}" for i in range(n_layers)]
if method == "hoeffding":
assignments = []
for i, (w, name) in enumerate(zip(layer_weights, layer_names)):
fan_in = w.shape[1] if w.ndim == 2 else 1
# Per-synapse error epsilon, aggregated over fan_in synapses
per_syn_eps = target_error / max(1, np.sqrt(fan_in))
L = adaptive_length(p=0.5, epsilon=per_syn_eps, confidence=0.95)
L = int(np.clip(L, min_length, max_length))
# Round up to power of 2 for hardware efficiency
L = int(2 ** np.ceil(np.log2(max(L, min_length))))
L = min(L, max_length)
bound = 0.5 / np.sqrt(L) if L > 0 else 1.0
assignments.append(
LayerPrecision(
layer_index=i,
name=name,
bitstream_length=L,
error_bound=bound,
sensitivity=0.0,
)
)
return assignments
# Sensitivity-based assignment
sensitivities = analyze_sensitivity(layer_weights)
total_sens = sum(sensitivities) or 1.0
if total_budget is None: # pragma: no cover
total_budget = max_length * n_layers
assignments = []
for i, (w, name, sens) in enumerate(zip(layer_weights, layer_names, sensitivities)):
# Allocate budget proportional to sensitivity
fraction = sens / total_sens
L = int(fraction * total_budget / n_layers * n_layers)
L = int(np.clip(L, min_length, max_length))
L = int(2 ** np.ceil(np.log2(max(L, min_length))))
L = min(L, max_length)
bound = 0.5 / np.sqrt(L) if L > 0 else 1.0
assignments.append(
LayerPrecision(
layer_index=i,
name=name,
bitstream_length=L,
error_bound=bound,
sensitivity=sens,
)
)
return assignments
|
IR Type Checker
Validates Stochastic IR graphs before emission. Catches Bitstream/Rate/Spike
type mismatches that would otherwise silently produce wrong results.
Signal types: BITSTREAM, RATE, SPIKE, FIXED, ANY.
sc_neurocore.compiler.ir_type_checker
Type checker for Stochastic IR: catches Bitstream/Rate/Spike mismatches.
Before emitting Verilog or MLIR, the IR graph should be type-checked
to ensure connected nodes have compatible signal types. Without this,
type errors only surface at synthesis time (or worse, produce silent
wrong-answer bugs).
Signal types:
- Bitstream: temporal sequence of {0,1}, encodes probability via density
- Rate: scalar probability in [0,1], no temporal structure
- Spike: binary event (0 or 1), single timestep
- Fixed: Q-format fixed-point integer
Compatible connections:
- Bitstream → Bitstream (native SC)
- Rate → Rate (probability domain)
- Spike → Spike (spiking domain)
- Rate → Bitstream (requires encoder)
- Bitstream → Rate (requires decoder/popcount)
- Spike → Bitstream (direct embedding)
IRNode
dataclass
A typed node in the Stochastic IR graph.
Source code in src/sc_neurocore/compiler/ir_type_checker.py
| @dataclass
class IRNode:
"""A typed node in the Stochastic IR graph."""
name: str
op: str # e.g. "and", "mux", "xor", "encoder", "decoder", "lif", "popcount"
input_types: list[SignalType] = field(default_factory=list)
output_type: SignalType = SignalType.BITSTREAM
|
IRTypeError
dataclass
A type mismatch found during checking.
Source code in src/sc_neurocore/compiler/ir_type_checker.py
80
81
82
83
84
85
86
87
88 | @dataclass
class IRTypeError:
"""A type mismatch found during checking."""
src_node: str
dst_node: str
src_type: SignalType
dst_type: SignalType
message: str
|
types_compatible(src, dst)
Check if src can connect to dst without explicit conversion.
Source code in src/sc_neurocore/compiler/ir_type_checker.py
| def types_compatible(src: SignalType, dst: SignalType) -> bool:
"""Check if src can connect to dst without explicit conversion."""
if src == SignalType.ANY or dst == SignalType.ANY:
return True
return (src, dst) in _COMPATIBLE
|
check_ir_types(nodes, edges)
Type-check an IR graph and return all type errors.
Parameters
nodes : dict mapping node name → IRNode
edges : list of IREdge connections
Returns
list of IRTypeError (empty if all types check out)
Source code in src/sc_neurocore/compiler/ir_type_checker.py
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 | def check_ir_types(
nodes: dict[str, IRNode],
edges: list[IREdge],
) -> list[IRTypeError]:
"""Type-check an IR graph and return all type errors.
Parameters
----------
nodes : dict mapping node name → IRNode
edges : list of IREdge connections
Returns
-------
list of IRTypeError (empty if all types check out)
"""
errors: list[IRTypeError] = []
for edge in edges:
if edge.src not in nodes:
errors.append(
IRTypeError(
edge.src,
edge.dst,
SignalType.ANY,
SignalType.ANY,
f"Source node '{edge.src}' not found in graph",
)
)
continue
if edge.dst not in nodes:
errors.append(
IRTypeError(
edge.src,
edge.dst,
SignalType.ANY,
SignalType.ANY,
f"Destination node '{edge.dst}' not found in graph",
)
)
continue
src_node = nodes[edge.src]
dst_node = nodes[edge.dst]
src_type = src_node.output_type
if edge.dst_port >= len(dst_node.input_types):
errors.append(
IRTypeError(
edge.src,
edge.dst,
src_type,
SignalType.ANY,
f"Port {edge.dst_port} out of range for '{edge.dst}' "
f"(has {len(dst_node.input_types)} inputs)",
)
)
continue
dst_type = dst_node.input_types[edge.dst_port]
if not types_compatible(src_type, dst_type):
errors.append(
IRTypeError(
edge.src,
edge.dst,
src_type,
dst_type,
f"Type mismatch: {edge.src} outputs {src_type.name} "
f"but {edge.dst} port {edge.dst_port} expects {dst_type.name}. "
f"Insert a converter (encoder/decoder).",
)
)
return errors
|