Skip to content

Hypervisor

Multi-tenant neuromorphic hypervisor. Manages isolated workloads on shared FPGA resources with resource quotas, priority preemption, and health monitoring.

Quick Start

Python
from sc_neurocore.hypervisor.hypervisor import (
    Hypervisor, Tenant, ResourceAllocator, HealthMonitor,
)

sc_neurocore.hypervisor.hypervisor

Multi-tenant neuromorphic hypervisor with hard isolation.

Enables multiple SC networks to share the same FPGA/ASIC fabric with:

  • Spatial partitioning: Non-overlapping hardware regions per tenant
  • Temporal partitioning: Time-sliced scheduling with preemption
  • Bitstream firewalls: AXI address-range isolation preventing cross-tenant memory/register access
  • Dynamic migration: Live migration of tenants between dies/regions with state checkpoint/restore
  • QoS enforcement: Per-tenant bandwidth, latency, and compute quotas

Architecture:

Text Only
┌──────────────────────────────────────┐
│          Hypervisor Scheduler         │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐│
│  │ Tenant A │ │ Tenant B │ │ Tenant C ││
│  │ (BCI)   │ │ (Vision) │ │ (Audio) ││
│  └────┬────┘ └────┬────┘ └────┬────┘│
│       │           │           │      │
│  ┌────▼───────────▼───────────▼────┐ │
│  │     Bitstream Firewall Layer     │ │
│  └────┬───────────┬───────────┬────┘ │
│       │           │           │      │
│  ┌────▼────┐ ┌────▼────┐ ┌────▼────┐│
│  │Region 0 │ │Region 1 │ │Region 2 ││
│  │AXI+AER  │ │AXI+AER  │ │AXI+AER  ││
│  └─────────┘ └─────────┘ └─────────┘│
└──────────────────────────────────────┘

Compatible with: - hdl/sc_axis_interface.v — AXI-Stream wrappers - hdl/sc_aer_router.v — AER spike routing - chiplet_gen/ — multi-die topology - dynamic_adaptation/ — runtime adaptation hooks

HWRegion dataclass

One isolated hardware region on the fabric.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@dataclass
class HWRegion:
    """One isolated hardware region on the fabric."""

    region_id: int
    num_neurons: int
    num_synapses: int
    axi_base_addr: int
    axi_size: int
    die_id: int = 0
    state: RegionState = RegionState.FREE
    tenant_id: Optional[str] = None
    utilisation: float = 0.0

    @property
    def axi_end_addr(self) -> int:
        return self.axi_base_addr + self.axi_size

    @property
    def is_free(self) -> bool:
        return self.state == RegionState.FREE

    def contains_addr(self, addr: int) -> bool:
        return self.axi_base_addr <= addr < self.axi_end_addr

QoSPolicy dataclass

Quality-of-Service policy for a tenant.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
106
107
108
109
110
111
112
113
114
115
@dataclass
class QoSPolicy:
    """Quality-of-Service policy for a tenant."""

    max_bandwidth_mbps: float = 100.0
    max_latency_us: float = 1000.0
    min_compute_share: float = 0.1
    max_neurons: int = 1024
    max_synapses: int = 16384
    preemptible: bool = True

TenantState dataclass

Checkpointable state for live migration.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@dataclass
class TenantState:
    """Checkpointable state for live migration."""

    neuron_voltages: Optional[np.ndarray] = None
    synapse_weights: Optional[np.ndarray] = None
    spike_queues: Optional[np.ndarray] = None
    lfsr_state: int = 0
    timestep: int = 0
    checksum: str = ""

    def compute_checksum(self) -> str:
        h = hashlib.sha256()
        if self.neuron_voltages is not None:
            h.update(self.neuron_voltages.tobytes())
        if self.synapse_weights is not None:
            h.update(self.synapse_weights.tobytes())
        h.update(self.lfsr_state.to_bytes(4, "little"))
        h.update(self.timestep.to_bytes(4, "little"))
        self.checksum = h.hexdigest()[:16]
        return self.checksum

Tenant dataclass

One SC network tenant on the hypervisor.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@dataclass
class Tenant:
    """One SC network tenant on the hypervisor."""

    tenant_id: str
    name: str
    priority: TenantPriority = TenantPriority.NORMAL
    qos: QoSPolicy = field(default_factory=QoSPolicy)
    region_id: Optional[int] = None
    state: Optional[TenantState] = None
    active: bool = False
    total_spikes: int = 0
    total_cycles: int = 0
    created_ns: int = 0
    last_scheduled_ns: int = 0

FirewallRule dataclass

One address-range access rule.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
161
162
163
164
165
166
167
168
169
170
171
172
173
@dataclass
class FirewallRule:
    """One address-range access rule."""

    tenant_id: str
    base_addr: int
    size: int
    read_allowed: bool = True
    write_allowed: bool = True

    @property
    def end_addr(self) -> int:
        return self.base_addr + self.size

BitstreamFirewall

AXI address-range isolation preventing cross-tenant access.

Each tenant can only access its own region's AXI address space. Any cross-region access is blocked and logged as a violation.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class BitstreamFirewall:
    """AXI address-range isolation preventing cross-tenant access.

    Each tenant can only access its own region's AXI address space.
    Any cross-region access is blocked and logged as a violation.
    """

    def __init__(self):
        self.rules: List[FirewallRule] = []
        self.violations: List[Dict[str, Any]] = []
        self.max_violations: int = 1000

    def add_rule(self, rule: FirewallRule) -> None:
        self.rules.append(rule)

    def remove_tenant_rules(self, tenant_id: str) -> int:
        before = len(self.rules)
        self.rules = [r for r in self.rules if r.tenant_id != tenant_id]
        return before - len(self.rules)

    def check_access(self, tenant_id: str, addr: int, is_write: bool = False) -> bool:
        """Check if a tenant can access an address."""
        for rule in self.rules:
            if rule.tenant_id != tenant_id:
                continue
            if rule.base_addr <= addr < rule.end_addr:
                if is_write and not rule.write_allowed:
                    self._log_violation(tenant_id, addr, "write_denied")
                    return False
                if not is_write and not rule.read_allowed:
                    self._log_violation(tenant_id, addr, "read_denied")
                    return False
                return True

        self._log_violation(tenant_id, addr, "no_rule")
        return False

    def _log_violation(self, tenant_id: str, addr: int, reason: str) -> None:
        if len(self.violations) < self.max_violations:
            self.violations.append(
                {
                    "tenant_id": tenant_id,
                    "addr": hex(addr),
                    "reason": reason,
                    "timestamp_ns": time.time_ns(),
                }
            )

    @property
    def violation_count(self) -> int:
        return len(self.violations)

    def clear_violations(self) -> None:
        self.violations.clear()

check_access(tenant_id, addr, is_write=False)

Check if a tenant can access an address.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def check_access(self, tenant_id: str, addr: int, is_write: bool = False) -> bool:
    """Check if a tenant can access an address."""
    for rule in self.rules:
        if rule.tenant_id != tenant_id:
            continue
        if rule.base_addr <= addr < rule.end_addr:
            if is_write and not rule.write_allowed:
                self._log_violation(tenant_id, addr, "write_denied")
                return False
            if not is_write and not rule.read_allowed:
                self._log_violation(tenant_id, addr, "read_denied")
                return False
            return True

    self._log_violation(tenant_id, addr, "no_rule")
    return False

ScheduleSlot dataclass

One time slot in the schedule.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
242
243
244
245
246
247
248
249
250
251
252
253
@dataclass
class ScheduleSlot:
    """One time slot in the schedule."""

    tenant_id: str
    region_id: int
    start_cycle: int
    duration_cycles: int

    @property
    def end_cycle(self) -> int:
        return self.start_cycle + self.duration_cycles

Scheduler

Multi-tenant temporal scheduler with preemption.

Supports priority-based, round-robin, fair-share, and EDF scheduling.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
class Scheduler:
    """Multi-tenant temporal scheduler with preemption.

    Supports priority-based, round-robin, fair-share, and EDF scheduling.
    """

    def __init__(self, policy: SchedulingPolicy = SchedulingPolicy.PRIORITY):
        self.policy = policy
        self.time_quantum_cycles: int = 10000
        self.schedule: List[ScheduleSlot] = []
        self.current_cycle: int = 0

    def generate_schedule(self, tenants: List[Tenant], num_cycles: int) -> List[ScheduleSlot]:
        """Generate a schedule for the given tenants."""
        if not tenants:
            return []
        active = [t for t in tenants if t.active and t.region_id is not None]
        if not active:
            return []

        if self.policy == SchedulingPolicy.ROUND_ROBIN:
            return self._round_robin(active, num_cycles)
        elif self.policy == SchedulingPolicy.PRIORITY:
            return self._priority(active, num_cycles)
        elif self.policy == SchedulingPolicy.FAIR_SHARE:
            return self._fair_share(active, num_cycles)
        elif self.policy == SchedulingPolicy.EDF:
            return self._edf(active, num_cycles)
        return []

    def _round_robin(self, tenants: List[Tenant], total: int) -> List[ScheduleSlot]:
        slots = []
        cycle = 0
        idx = 0
        while cycle < total:
            t = tenants[idx % len(tenants)]
            dur = min(self.time_quantum_cycles, total - cycle)
            slots.append(ScheduleSlot(t.tenant_id, t.region_id or 0, cycle, dur))
            cycle += dur
            idx += 1
        self.schedule = slots
        return slots

    def _priority(self, tenants: List[Tenant], total: int) -> List[ScheduleSlot]:
        sorted_t = sorted(tenants, key=lambda t: t.priority.value)
        slots = []
        cycle = 0
        for t in sorted_t:
            share = max(1, total // len(tenants))
            if t.priority == TenantPriority.REALTIME:
                share = total // 2  # Realtime gets 50%
            dur = min(share, total - cycle)
            if dur > 0:
                slots.append(ScheduleSlot(t.tenant_id, t.region_id or 0, cycle, dur))
                cycle += dur
        self.schedule = slots
        return slots

    def _fair_share(self, tenants: List[Tenant], total: int) -> List[ScheduleSlot]:
        total_share = sum(t.qos.min_compute_share for t in tenants)
        slots = []
        cycle = 0
        for t in tenants:
            frac = t.qos.min_compute_share / total_share if total_share > 0 else 1.0 / len(tenants)
            dur = int(total * frac)
            dur = min(dur, total - cycle)
            if dur > 0:
                slots.append(ScheduleSlot(t.tenant_id, t.region_id or 0, cycle, dur))
                cycle += dur
        self.schedule = slots
        return slots

    def _edf(self, tenants: List[Tenant], total: int) -> List[ScheduleSlot]:
        sorted_t = sorted(tenants, key=lambda t: t.qos.max_latency_us)
        return self._round_robin(sorted_t, total)

generate_schedule(tenants, num_cycles)

Generate a schedule for the given tenants.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def generate_schedule(self, tenants: List[Tenant], num_cycles: int) -> List[ScheduleSlot]:
    """Generate a schedule for the given tenants."""
    if not tenants:
        return []
    active = [t for t in tenants if t.active and t.region_id is not None]
    if not active:
        return []

    if self.policy == SchedulingPolicy.ROUND_ROBIN:
        return self._round_robin(active, num_cycles)
    elif self.policy == SchedulingPolicy.PRIORITY:
        return self._priority(active, num_cycles)
    elif self.policy == SchedulingPolicy.FAIR_SHARE:
        return self._fair_share(active, num_cycles)
    elif self.policy == SchedulingPolicy.EDF:
        return self._edf(active, num_cycles)
    return []

MigrationRequest dataclass

Request to migrate a tenant between regions.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
336
337
338
339
340
341
342
343
@dataclass
class MigrationRequest:
    """Request to migrate a tenant between regions."""

    tenant_id: str
    source_region: int
    target_region: int
    reason: str = "load_balance"

MigrationResult dataclass

Result of a migration attempt.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
346
347
348
349
350
351
352
353
354
355
356
@dataclass
class MigrationResult:
    """Result of a migration attempt."""

    success: bool
    tenant_id: str
    source_region: int
    target_region: int
    state_checksum: str = ""
    duration_ns: int = 0
    reason: str = ""

MigrationEngine

Live migration of tenants between hardware regions.

Migration steps: 1. Pause tenant on source region 2. Checkpoint state (voltages, weights, LFSR, spike queues) 3. Verify checkpoint integrity (SHA-256) 4. Restore state on target region 5. Update firewall rules 6. Resume tenant

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
class MigrationEngine:
    """Live migration of tenants between hardware regions.

    Migration steps:
    1. Pause tenant on source region
    2. Checkpoint state (voltages, weights, LFSR, spike queues)
    3. Verify checkpoint integrity (SHA-256)
    4. Restore state on target region
    5. Update firewall rules
    6. Resume tenant
    """

    def __init__(self):
        self.history: List[MigrationResult] = []

    def checkpoint(self, tenant: Tenant) -> TenantState:
        """Checkpoint tenant state for migration."""
        if tenant.state is None:
            tenant.state = TenantState()
        tenant.state.compute_checksum()
        return tenant.state

    def restore(self, tenant: Tenant, state: TenantState) -> bool:
        """Restore checkpointed state to a tenant."""
        verify = state.checksum
        recomputed = state.compute_checksum()
        if verify and verify != recomputed:
            return False
        tenant.state = state
        return True

    def migrate(
        self,
        tenant: Tenant,
        source: HWRegion,
        target: HWRegion,
        firewall: BitstreamFirewall,
    ) -> MigrationResult:
        """Execute live migration."""
        start = time.time_ns()

        # 1. Checkpoint
        state = self.checkpoint(tenant)
        checksum = state.checksum

        # 2. Free source
        source.state = RegionState.FREE
        source.tenant_id = None

        # 3. Allocate target
        if not target.is_free:
            result = MigrationResult(
                False,
                tenant.tenant_id,
                source.region_id,
                target.region_id,
                reason="target_not_free",
            )
            self.history.append(result)
            return result

        target.state = RegionState.ALLOCATED
        target.tenant_id = tenant.tenant_id

        # 4. Update firewall
        firewall.remove_tenant_rules(tenant.tenant_id)
        firewall.add_rule(
            FirewallRule(
                tenant.tenant_id,
                target.axi_base_addr,
                target.axi_size,
            )
        )

        # 5. Restore state
        success = self.restore(tenant, state)
        tenant.region_id = target.region_id

        elapsed = time.time_ns() - start
        result = MigrationResult(
            success,
            tenant.tenant_id,
            source.region_id,
            target.region_id,
            checksum,
            elapsed,
        )
        self.history.append(result)
        return result

checkpoint(tenant)

Checkpoint tenant state for migration.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
374
375
376
377
378
379
def checkpoint(self, tenant: Tenant) -> TenantState:
    """Checkpoint tenant state for migration."""
    if tenant.state is None:
        tenant.state = TenantState()
    tenant.state.compute_checksum()
    return tenant.state

restore(tenant, state)

Restore checkpointed state to a tenant.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
381
382
383
384
385
386
387
388
def restore(self, tenant: Tenant, state: TenantState) -> bool:
    """Restore checkpointed state to a tenant."""
    verify = state.checksum
    recomputed = state.compute_checksum()
    if verify and verify != recomputed:
        return False
    tenant.state = state
    return True

migrate(tenant, source, target, firewall)

Execute live migration.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def migrate(
    self,
    tenant: Tenant,
    source: HWRegion,
    target: HWRegion,
    firewall: BitstreamFirewall,
) -> MigrationResult:
    """Execute live migration."""
    start = time.time_ns()

    # 1. Checkpoint
    state = self.checkpoint(tenant)
    checksum = state.checksum

    # 2. Free source
    source.state = RegionState.FREE
    source.tenant_id = None

    # 3. Allocate target
    if not target.is_free:
        result = MigrationResult(
            False,
            tenant.tenant_id,
            source.region_id,
            target.region_id,
            reason="target_not_free",
        )
        self.history.append(result)
        return result

    target.state = RegionState.ALLOCATED
    target.tenant_id = tenant.tenant_id

    # 4. Update firewall
    firewall.remove_tenant_rules(tenant.tenant_id)
    firewall.add_rule(
        FirewallRule(
            tenant.tenant_id,
            target.axi_base_addr,
            target.axi_size,
        )
    )

    # 5. Restore state
    success = self.restore(tenant, state)
    tenant.region_id = target.region_id

    elapsed = time.time_ns() - start
    result = MigrationResult(
        success,
        tenant.tenant_id,
        source.region_id,
        target.region_id,
        checksum,
        elapsed,
    )
    self.history.append(result)
    return result

HypervisorConfig dataclass

Hypervisor configuration.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
453
454
455
456
457
458
459
460
461
@dataclass
class HypervisorConfig:
    """Hypervisor configuration."""

    max_tenants: int = 16
    scheduling_policy: SchedulingPolicy = SchedulingPolicy.PRIORITY
    time_quantum_cycles: int = 10000
    migration_cooldown_ns: int = 1_000_000_000  # 1s
    enable_firewall: bool = True

Hypervisor

Multi-tenant neuromorphic hypervisor.

Manages tenant lifecycle, hardware allocation, scheduling, firewall enforcement, and live migration.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
class Hypervisor:
    """Multi-tenant neuromorphic hypervisor.

    Manages tenant lifecycle, hardware allocation, scheduling,
    firewall enforcement, and live migration.
    """

    def __init__(self, config: Optional[HypervisorConfig] = None):
        self.config = config or HypervisorConfig()
        self.regions: Dict[int, HWRegion] = {}
        self.tenants: Dict[str, Tenant] = {}
        self.scheduler = Scheduler(self.config.scheduling_policy)
        self.scheduler.time_quantum_cycles = self.config.time_quantum_cycles
        self.firewall = BitstreamFirewall()
        self.migration_engine = MigrationEngine()
        self.uptime_ns: int = 0

    def add_region(self, region: HWRegion) -> None:
        """Register a hardware region."""
        self.regions[region.region_id] = region

    def register_tenant(self, tenant: Tenant) -> bool:
        """Register a new tenant."""
        if len(self.tenants) >= self.config.max_tenants:
            return False
        if tenant.tenant_id in self.tenants:
            return False
        tenant.created_ns = time.time_ns()
        self.tenants[tenant.tenant_id] = tenant
        return True

    def allocate(self, tenant_id: str) -> Optional[int]:
        """Allocate a free region to a tenant."""
        tenant = self.tenants.get(tenant_id)
        if tenant is None:
            return None

        # Find a free region that fits the QoS
        for rid, region in self.regions.items():
            if not region.is_free:
                continue
            if region.num_neurons < tenant.qos.max_neurons:
                continue
            # Allocate
            region.state = RegionState.ALLOCATED
            region.tenant_id = tenant_id
            tenant.region_id = rid
            tenant.active = True

            # Set up firewall
            if self.config.enable_firewall:
                self.firewall.add_rule(
                    FirewallRule(
                        tenant_id,
                        region.axi_base_addr,
                        region.axi_size,
                    )
                )
            return rid
        return None

    def deallocate(self, tenant_id: str) -> bool:
        """Release a tenant's hardware region."""
        tenant = self.tenants.get(tenant_id)
        if tenant is None or tenant.region_id is None:
            return False

        region = self.regions.get(tenant.region_id)
        if region is not None:
            region.state = RegionState.FREE
            region.tenant_id = None

        self.firewall.remove_tenant_rules(tenant_id)
        tenant.region_id = None
        tenant.active = False
        return True

    def remove_tenant(self, tenant_id: str) -> bool:
        """Remove a tenant entirely."""
        self.deallocate(tenant_id)
        return self.tenants.pop(tenant_id, None) is not None

    def schedule(self, num_cycles: int) -> List[ScheduleSlot]:
        """Generate a schedule for active tenants."""
        active = [t for t in self.tenants.values() if t.active]
        return self.scheduler.generate_schedule(active, num_cycles)

    def migrate(self, tenant_id: str, target_region_id: int) -> MigrationResult:
        """Migrate a tenant to a different region."""
        tenant = self.tenants.get(tenant_id)
        if tenant is None or tenant.region_id is None:
            return MigrationResult(False, tenant_id or "", -1, target_region_id, reason="not_found")

        source = self.regions.get(tenant.region_id)
        target = self.regions.get(target_region_id)
        if source is None or target is None:
            return MigrationResult(False, tenant_id, -1, target_region_id, reason="invalid_region")

        return self.migration_engine.migrate(tenant, source, target, self.firewall)

    def check_access(self, tenant_id: str, addr: int, is_write: bool = False) -> bool:
        """Check if a tenant can access an address (firewall)."""
        if not self.config.enable_firewall:
            return True
        return self.firewall.check_access(tenant_id, addr, is_write)

    def status(self) -> Dict[str, Any]:
        """Get hypervisor status."""
        free_regions = sum(1 for r in self.regions.values() if r.is_free)
        active_tenants = sum(1 for t in self.tenants.values() if t.active)
        return {
            "total_regions": len(self.regions),
            "free_regions": free_regions,
            "total_tenants": len(self.tenants),
            "active_tenants": active_tenants,
            "firewall_violations": self.firewall.violation_count,
            "migrations": len(self.migration_engine.history),
            "scheduling_policy": self.config.scheduling_policy.value,
        }

    def tenant_report(self, tenant_id: str) -> Optional[Dict[str, Any]]:
        """Get a report for one tenant."""
        t = self.tenants.get(tenant_id)
        if t is None:
            return None
        return {
            "tenant_id": t.tenant_id,
            "name": t.name,
            "priority": t.priority.value,
            "region_id": t.region_id,
            "active": t.active,
            "total_spikes": t.total_spikes,
            "total_cycles": t.total_cycles,
            "qos_bandwidth_mbps": t.qos.max_bandwidth_mbps,
            "qos_latency_us": t.qos.max_latency_us,
        }

    def compute_utilisation(self) -> Dict[int, float]:
        """Compute utilisation fraction per region."""
        result = {}
        for rid, region in self.regions.items():
            if region.is_free:
                result[rid] = 0.0
            elif region.tenant_id:
                tenant = self.tenants.get(region.tenant_id)
                if tenant and tenant.qos:
                    result[rid] = min(1.0, tenant.qos.max_neurons / max(region.num_neurons, 1))
                else:
                    result[rid] = 1.0
            else:
                result[rid] = 0.0
        return result

    def check_overcommit(self) -> bool:
        """Check if total tenant QoS exceeds fabric capacity."""
        total_neurons_needed = sum(t.qos.max_neurons for t in self.tenants.values() if t.active)
        total_neurons_available = sum(r.num_neurons for r in self.regions.values())
        return total_neurons_needed > total_neurons_available

    def get_faulted_regions(self) -> List[int]:
        """List regions in FAULTED state."""
        return [rid for rid, r in self.regions.items() if r.state == RegionState.FAULTED]

    def mark_region_faulted(self, region_id: int) -> bool:
        """Mark a region as faulted and evict its tenant."""
        region = self.regions.get(region_id)
        if region is None:
            return False
        if region.tenant_id:
            self.deallocate(region.tenant_id)
        region.state = RegionState.FAULTED
        return True

add_region(region)

Register a hardware region.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
481
482
483
def add_region(self, region: HWRegion) -> None:
    """Register a hardware region."""
    self.regions[region.region_id] = region

register_tenant(tenant)

Register a new tenant.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
485
486
487
488
489
490
491
492
493
def register_tenant(self, tenant: Tenant) -> bool:
    """Register a new tenant."""
    if len(self.tenants) >= self.config.max_tenants:
        return False
    if tenant.tenant_id in self.tenants:
        return False
    tenant.created_ns = time.time_ns()
    self.tenants[tenant.tenant_id] = tenant
    return True

allocate(tenant_id)

Allocate a free region to a tenant.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def allocate(self, tenant_id: str) -> Optional[int]:
    """Allocate a free region to a tenant."""
    tenant = self.tenants.get(tenant_id)
    if tenant is None:
        return None

    # Find a free region that fits the QoS
    for rid, region in self.regions.items():
        if not region.is_free:
            continue
        if region.num_neurons < tenant.qos.max_neurons:
            continue
        # Allocate
        region.state = RegionState.ALLOCATED
        region.tenant_id = tenant_id
        tenant.region_id = rid
        tenant.active = True

        # Set up firewall
        if self.config.enable_firewall:
            self.firewall.add_rule(
                FirewallRule(
                    tenant_id,
                    region.axi_base_addr,
                    region.axi_size,
                )
            )
        return rid
    return None

deallocate(tenant_id)

Release a tenant's hardware region.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def deallocate(self, tenant_id: str) -> bool:
    """Release a tenant's hardware region."""
    tenant = self.tenants.get(tenant_id)
    if tenant is None or tenant.region_id is None:
        return False

    region = self.regions.get(tenant.region_id)
    if region is not None:
        region.state = RegionState.FREE
        region.tenant_id = None

    self.firewall.remove_tenant_rules(tenant_id)
    tenant.region_id = None
    tenant.active = False
    return True

remove_tenant(tenant_id)

Remove a tenant entirely.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
541
542
543
544
def remove_tenant(self, tenant_id: str) -> bool:
    """Remove a tenant entirely."""
    self.deallocate(tenant_id)
    return self.tenants.pop(tenant_id, None) is not None

schedule(num_cycles)

Generate a schedule for active tenants.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
546
547
548
549
def schedule(self, num_cycles: int) -> List[ScheduleSlot]:
    """Generate a schedule for active tenants."""
    active = [t for t in self.tenants.values() if t.active]
    return self.scheduler.generate_schedule(active, num_cycles)

migrate(tenant_id, target_region_id)

Migrate a tenant to a different region.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
551
552
553
554
555
556
557
558
559
560
561
562
def migrate(self, tenant_id: str, target_region_id: int) -> MigrationResult:
    """Migrate a tenant to a different region."""
    tenant = self.tenants.get(tenant_id)
    if tenant is None or tenant.region_id is None:
        return MigrationResult(False, tenant_id or "", -1, target_region_id, reason="not_found")

    source = self.regions.get(tenant.region_id)
    target = self.regions.get(target_region_id)
    if source is None or target is None:
        return MigrationResult(False, tenant_id, -1, target_region_id, reason="invalid_region")

    return self.migration_engine.migrate(tenant, source, target, self.firewall)

check_access(tenant_id, addr, is_write=False)

Check if a tenant can access an address (firewall).

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
564
565
566
567
568
def check_access(self, tenant_id: str, addr: int, is_write: bool = False) -> bool:
    """Check if a tenant can access an address (firewall)."""
    if not self.config.enable_firewall:
        return True
    return self.firewall.check_access(tenant_id, addr, is_write)

status()

Get hypervisor status.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
570
571
572
573
574
575
576
577
578
579
580
581
582
def status(self) -> Dict[str, Any]:
    """Get hypervisor status."""
    free_regions = sum(1 for r in self.regions.values() if r.is_free)
    active_tenants = sum(1 for t in self.tenants.values() if t.active)
    return {
        "total_regions": len(self.regions),
        "free_regions": free_regions,
        "total_tenants": len(self.tenants),
        "active_tenants": active_tenants,
        "firewall_violations": self.firewall.violation_count,
        "migrations": len(self.migration_engine.history),
        "scheduling_policy": self.config.scheduling_policy.value,
    }

tenant_report(tenant_id)

Get a report for one tenant.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
def tenant_report(self, tenant_id: str) -> Optional[Dict[str, Any]]:
    """Get a report for one tenant."""
    t = self.tenants.get(tenant_id)
    if t is None:
        return None
    return {
        "tenant_id": t.tenant_id,
        "name": t.name,
        "priority": t.priority.value,
        "region_id": t.region_id,
        "active": t.active,
        "total_spikes": t.total_spikes,
        "total_cycles": t.total_cycles,
        "qos_bandwidth_mbps": t.qos.max_bandwidth_mbps,
        "qos_latency_us": t.qos.max_latency_us,
    }

compute_utilisation()

Compute utilisation fraction per region.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def compute_utilisation(self) -> Dict[int, float]:
    """Compute utilisation fraction per region."""
    result = {}
    for rid, region in self.regions.items():
        if region.is_free:
            result[rid] = 0.0
        elif region.tenant_id:
            tenant = self.tenants.get(region.tenant_id)
            if tenant and tenant.qos:
                result[rid] = min(1.0, tenant.qos.max_neurons / max(region.num_neurons, 1))
            else:
                result[rid] = 1.0
        else:
            result[rid] = 0.0
    return result

check_overcommit()

Check if total tenant QoS exceeds fabric capacity.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
617
618
619
620
621
def check_overcommit(self) -> bool:
    """Check if total tenant QoS exceeds fabric capacity."""
    total_neurons_needed = sum(t.qos.max_neurons for t in self.tenants.values() if t.active)
    total_neurons_available = sum(r.num_neurons for r in self.regions.values())
    return total_neurons_needed > total_neurons_available

get_faulted_regions()

List regions in FAULTED state.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
623
624
625
def get_faulted_regions(self) -> List[int]:
    """List regions in FAULTED state."""
    return [rid for rid, r in self.regions.items() if r.state == RegionState.FAULTED]

mark_region_faulted(region_id)

Mark a region as faulted and evict its tenant.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
627
628
629
630
631
632
633
634
635
def mark_region_faulted(self, region_id: int) -> bool:
    """Mark a region as faulted and evict its tenant."""
    region = self.regions.get(region_id)
    if region is None:
        return False
    if region.tenant_id:
        self.deallocate(region.tenant_id)
    region.state = RegionState.FAULTED
    return True

BandwidthMeter dataclass

Per-tenant throughput metering (spikes/cycles per window).

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
@dataclass
class BandwidthMeter:
    """Per-tenant throughput metering (spikes/cycles per window)."""

    window_cycles: int = 100_000
    _counters: Dict[str, List[int]] = field(default_factory=dict)
    _timestamps: Dict[str, List[int]] = field(default_factory=dict)

    def record(self, tenant_id: str, spike_count: int, cycle: int) -> None:
        if tenant_id not in self._counters:
            self._counters[tenant_id] = []
            self._timestamps[tenant_id] = []
        self._counters[tenant_id].append(spike_count)
        self._timestamps[tenant_id].append(cycle)

    def throughput(self, tenant_id: str) -> float:
        """Spikes per cycle (averaged over window)."""
        if tenant_id not in self._counters or not self._counters[tenant_id]:
            return 0.0
        entries = self._counters[tenant_id]
        total_spikes = sum(entries[-100:])
        if len(entries) < 2:
            return float(total_spikes)
        ts = self._timestamps[tenant_id]
        span = max(1, ts[-1] - ts[max(0, len(ts) - 100)])
        return total_spikes / span

    def exceeds_quota(self, tenant_id: str, max_mbps: float) -> bool:
        return self.throughput(tenant_id) > max_mbps

throughput(tenant_id)

Spikes per cycle (averaged over window).

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
656
657
658
659
660
661
662
663
664
665
666
def throughput(self, tenant_id: str) -> float:
    """Spikes per cycle (averaged over window)."""
    if tenant_id not in self._counters or not self._counters[tenant_id]:
        return 0.0
    entries = self._counters[tenant_id]
    total_spikes = sum(entries[-100:])
    if len(entries) < 2:
        return float(total_spikes)
    ts = self._timestamps[tenant_id]
    span = max(1, ts[-1] - ts[max(0, len(ts) - 100)])
    return total_spikes / span

PreemptionEvent dataclass

Record of a preemption event.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
675
676
677
678
679
680
681
682
@dataclass
class PreemptionEvent:
    """Record of a preemption event."""

    victim_id: str
    preemptor_id: str
    cycle: int
    state_saved: bool

PreemptionManager

Handles preemption with state checkpoint/restore.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
class PreemptionManager:
    """Handles preemption with state checkpoint/restore."""

    def __init__(self):
        self.events: List[PreemptionEvent] = []
        self.saved_states: Dict[str, TenantState] = {}

    def preempt(
        self,
        victim: Tenant,
        preemptor: Tenant,
        region: HWRegion,
        cycle: int,
    ) -> PreemptionEvent:
        """Preempt victim and give region to preemptor."""
        state_saved = False
        if victim.state is not None:
            victim.state.compute_checksum()
            self.saved_states[victim.tenant_id] = victim.state
            state_saved = True

        victim.active = False
        victim.region_id = None
        region.tenant_id = preemptor.tenant_id
        preemptor.region_id = region.region_id
        preemptor.active = True

        evt = PreemptionEvent(victim.tenant_id, preemptor.tenant_id, cycle, state_saved)
        self.events.append(evt)
        return evt

    def restore_preempted(self, tenant: Tenant) -> bool:
        """Restore a previously preempted tenant's state."""
        if tenant.tenant_id not in self.saved_states:
            return False
        tenant.state = self.saved_states.pop(tenant.tenant_id)
        return True

preempt(victim, preemptor, region, cycle)

Preempt victim and give region to preemptor.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
def preempt(
    self,
    victim: Tenant,
    preemptor: Tenant,
    region: HWRegion,
    cycle: int,
) -> PreemptionEvent:
    """Preempt victim and give region to preemptor."""
    state_saved = False
    if victim.state is not None:
        victim.state.compute_checksum()
        self.saved_states[victim.tenant_id] = victim.state
        state_saved = True

    victim.active = False
    victim.region_id = None
    region.tenant_id = preemptor.tenant_id
    preemptor.region_id = region.region_id
    preemptor.active = True

    evt = PreemptionEvent(victim.tenant_id, preemptor.tenant_id, cycle, state_saved)
    self.events.append(evt)
    return evt

restore_preempted(tenant)

Restore a previously preempted tenant's state.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
716
717
718
719
720
721
def restore_preempted(self, tenant: Tenant) -> bool:
    """Restore a previously preempted tenant's state."""
    if tenant.tenant_id not in self.saved_states:
        return False
    tenant.state = self.saved_states.pop(tenant.tenant_id)
    return True

SLAViolation dataclass

One SLA violation.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
727
728
729
730
731
732
733
734
735
@dataclass
class SLAViolation:
    """One SLA violation."""

    tenant_id: str
    metric: str  # "latency", "bandwidth", "compute_share"
    measured: float
    limit: float
    cycle: int

SLAMonitor

Monitors per-tenant QoS compliance and detects violations.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
class SLAMonitor:
    """Monitors per-tenant QoS compliance and detects violations."""

    def __init__(self):
        self.violations: List[SLAViolation] = []

    def check_latency(
        self, tenant: Tenant, measured_us: float, cycle: int
    ) -> Optional[SLAViolation]:
        if measured_us > tenant.qos.max_latency_us:
            v = SLAViolation(
                tenant.tenant_id, "latency", measured_us, tenant.qos.max_latency_us, cycle
            )
            self.violations.append(v)
            return v
        return None

    def check_bandwidth(
        self, tenant: Tenant, measured_mbps: float, cycle: int
    ) -> Optional[SLAViolation]:
        if measured_mbps > tenant.qos.max_bandwidth_mbps:
            v = SLAViolation(
                tenant.tenant_id, "bandwidth", measured_mbps, tenant.qos.max_bandwidth_mbps, cycle
            )
            self.violations.append(v)
            return v
        return None

    @property
    def total_violations(self) -> int:
        return len(self.violations)

    def violations_for(self, tenant_id: str) -> List[SLAViolation]:
        return [v for v in self.violations if v.tenant_id == tenant_id]

UsageRecord dataclass

One billing record.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
800
801
802
803
804
805
806
807
@dataclass
class UsageRecord:
    """One billing record."""

    tenant_id: str
    cycles_used: int
    spikes_processed: int
    timestamp_ns: int

ResourceAccounting

Tracks per-tenant resource usage for metered billing.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
class ResourceAccounting:
    """Tracks per-tenant resource usage for metered billing."""

    def __init__(self):
        self.records: List[UsageRecord] = []
        self._totals: Dict[str, Dict[str, int]] = {}

    def record(self, tenant_id: str, cycles: int, spikes: int) -> None:
        r = UsageRecord(tenant_id, cycles, spikes, time.time_ns())
        self.records.append(r)
        if tenant_id not in self._totals:
            self._totals[tenant_id] = {"cycles": 0, "spikes": 0}
        self._totals[tenant_id]["cycles"] += cycles
        self._totals[tenant_id]["spikes"] += spikes

    def total_cycles(self, tenant_id: str) -> int:
        return self._totals.get(tenant_id, {}).get("cycles", 0)

    def total_spikes(self, tenant_id: str) -> int:
        return self._totals.get(tenant_id, {}).get("spikes", 0)

    def invoice(self, tenant_id: str, cost_per_cycle: float = 1e-6) -> float:
        """Compute billing amount."""
        return self.total_cycles(tenant_id) * cost_per_cycle

invoice(tenant_id, cost_per_cycle=1e-06)

Compute billing amount.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
831
832
833
def invoice(self, tenant_id: str, cost_per_cycle: float = 1e-6) -> float:
    """Compute billing amount."""
    return self.total_cycles(tenant_id) * cost_per_cycle

RegionHealth dataclass

Health score with degradation model.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
@dataclass
class RegionHealth:
    """Health score with degradation model."""

    region_id: int
    error_count: int = 0
    temperature_c: float = 25.0
    age_hours: float = 0.0

    @property
    def health_score(self) -> float:
        """0.0 = dead, 1.0 = perfect."""
        temp_pen = max(0, (self.temperature_c - 85)) * 0.01
        age_pen = self.age_hours / 100_000 * 0.1
        err_pen = min(self.error_count * 0.05, 0.5)
        return max(0.0, 1.0 - temp_pen - age_pen - err_pen)

    @property
    def is_degraded(self) -> bool:
        return self.health_score < 0.8

    def record_error(self) -> None:
        self.error_count += 1

health_score property

0.0 = dead, 1.0 = perfect.

SecurityAuditLog

Structured, append-only audit trail for compliance.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
class SecurityAuditLog:
    """Structured, append-only audit trail for compliance."""

    def __init__(self, max_entries: int = 10000):
        self.entries: Deque[AuditEntry] = deque(maxlen=max_entries)

    def log(self, event: AuditEntry) -> None:
        self.entries.append(event)

    def query(
        self, event_type: Optional[AuditEventType] = None, tenant_id: Optional[str] = None
    ) -> List[AuditEntry]:
        results = list(self.entries)
        if event_type is not None:
            results = [e for e in results if e.event_type == event_type]
        if tenant_id is not None:
            results = [e for e in results if e.tenant_id == tenant_id]
        return results

    @property
    def count(self) -> int:
        return len(self.entries)

    def checksum(self) -> str:
        h = hashlib.sha256()
        for entry in self.entries:
            h.update(f"{entry.event_type.value}:{entry.tenant_id}:{entry.timestamp_ns}".encode())
        return h.hexdigest()[:16]

MigrationThrottle dataclass

Rate-limits migration requests to prevent storms.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
@dataclass
class MigrationThrottle:
    """Rate-limits migration requests to prevent storms."""

    max_per_window: int = 5
    window_ns: int = 10_000_000_000  # 10s
    _timestamps: List[int] = field(default_factory=list)

    def allow(self) -> bool:
        """Check if a migration is allowed under the rate limit."""
        now = time.time_ns()
        cutoff = now - self.window_ns
        self._timestamps = [t for t in self._timestamps if t > cutoff]
        return len(self._timestamps) < self.max_per_window

    def record(self) -> None:
        self._timestamps.append(time.time_ns())

    @property
    def recent_count(self) -> int:
        now = time.time_ns()
        cutoff = now - self.window_ns
        return sum(1 for t in self._timestamps if t > cutoff)

allow()

Check if a migration is allowed under the rate limit.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
978
979
980
981
982
983
def allow(self) -> bool:
    """Check if a migration is allowed under the rate limit."""
    now = time.time_ns()
    cutoff = now - self.window_ns
    self._timestamps = [t for t in self._timestamps if t > cutoff]
    return len(self._timestamps) < self.max_per_window

select_region_multi_die(regions, min_neurons, preferred_die=None)

Select best free region, preferring a specific die.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
def select_region_multi_die(
    regions: Dict[int, HWRegion],
    min_neurons: int,
    preferred_die: Optional[int] = None,
) -> Optional[int]:
    """Select best free region, preferring a specific die."""
    candidates = [
        (rid, r) for rid, r in regions.items() if r.is_free and r.num_neurons >= min_neurons
    ]
    if not candidates:
        return None

    if preferred_die is not None:
        on_die = [(rid, r) for rid, r in candidates if r.die_id == preferred_die]
        if on_die:
            return min(on_die, key=lambda x: x[1].num_neurons)[0]

    return min(candidates, key=lambda x: x[1].num_neurons)[0]

admission_check(tenant, regions, existing_tenants)

Check if a new tenant can be admitted without overcommitting.

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
def admission_check(
    tenant: Tenant,
    regions: Dict[int, HWRegion],
    existing_tenants: Dict[str, Tenant],
) -> Tuple[bool, str]:
    """Check if a new tenant can be admitted without overcommitting."""
    required = tenant.qos.max_neurons
    free_capacity = sum(r.num_neurons for r in regions.values() if r.is_free)

    if required > free_capacity:
        return False, f"insufficient_neurons: need={required}, free={free_capacity}"

    if any(r.num_neurons >= required for r in regions.values() if r.is_free):
        return True, "admitted"

    return False, "no_single_region_large_enough"

verify_isolation(firewall, regions)

Verify that no two tenants share address ranges.

Returns list of violation descriptions (empty = sound).

Source code in src/sc_neurocore/hypervisor/hypervisor.py
Python
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
def verify_isolation(firewall: BitstreamFirewall, regions: Dict[int, HWRegion]) -> List[str]:
    """Verify that no two tenants share address ranges.

    Returns list of violation descriptions (empty = sound).
    """
    violations = []
    rules_by_tenant: Dict[str, List[FirewallRule]] = {}
    for rule in firewall.rules:
        rules_by_tenant.setdefault(rule.tenant_id, []).append(rule)

    tenant_ids = list(rules_by_tenant.keys())
    for i in range(len(tenant_ids)):
        for j in range(i + 1, len(tenant_ids)):
            t1, t2 = tenant_ids[i], tenant_ids[j]
            for r1 in rules_by_tenant[t1]:
                for r2 in rules_by_tenant[t2]:
                    if r1.base_addr < r2.end_addr and r2.base_addr < r1.end_addr:
                        violations.append(
                            f"overlap: {t1}[{hex(r1.base_addr)}:{hex(r1.end_addr)}] "
                            f"& {t2}[{hex(r2.base_addr)}:{hex(r2.end_addr)}]"
                        )
    return violations