Skip to content

Digital Twin Synchronization

Time-warp optimistic simulation for SC digital twins. Includes null-message lookahead, delta checkpointing, replay verification, drift auto-correction, and multi-twin federation with global virtual time.

Quick Start

Python
from sc_neurocore.digital_twin.twinsync import (
    TwinSession, NullMessageOptimizer, DeltaCheckpoint,
    ReplayVerifier, DriftAutoCorrector, TwinFederation,
)

sc_neurocore.digital_twin.twinsync

SC-native time-warp synchronization for BCI digital twins.

Provides causal ordering, checkpoint/resume, and time-warp rollback across distributed MPI nodes for real-time synchronization between a physical BCI subject and its billion-neuron SC digital twin.

Key primitives: - Lamport Clock: Logical timestamps for causal ordering - Vector Clock: Full causal dependency tracking across N nodes - Time-Warp Engine: Optimistic execution with anti-message rollback - Checkpoint Manager: Deterministic state snapshots with LFSR replay - Twin Session: Orchestrates physical ↔ digital synchronization

Architecture:

Text Only
Physical BCI Subject
       │ MEA / EEG / fNIRS
┌──────▼──────┐
│ SensorBridge │──► AER spike stream
└──────┬──────┘
       │ causal-ordered events
┌──────▼──────────────────────────┐
│     TwinSync Time-Warp Engine    │
│  ┌────────┐ ┌────────┐ ┌──────┐│
│  │ Node 0 │ │ Node 1 │ │Node N││
│  │ 100M   │ │ 100M   │ │100M  ││
│  │neurons │ │neurons │ │neuro ││
│  └───┬────┘ └───┬────┘ └──┬───┘│
│      └──────────┴─────────┘    │
│         vector clocks          │
└──────┬─────────────────────────┘
       │ synchronized output
┌──────▼──────┐
│ OutputBridge │──► closed-loop stimulation
└─────────────┘

Compatible with: - mpi_partitioner.py / hierarchical_partitioner.py — node topology - identity substrate (ArcaneNeuron.v_deep) — preserved across rollback - bioware/ — MEA/AER physical interface - sc_scope/ — live monitoring of twin divergence

LamportClock

Lamport logical clock for causal ordering.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class LamportClock:
    """Lamport logical clock for causal ordering."""

    def __init__(self):
        self.time: int = 0

    def tick(self) -> int:
        """Local event: increment."""
        self.time += 1
        return self.time

    def send(self) -> int:
        """Prepare timestamp for sending."""
        self.time += 1
        return self.time

    def receive(self, remote_time: int) -> int:
        """Update on message receipt."""
        self.time = max(self.time, remote_time) + 1
        return self.time

tick()

Local event: increment.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
73
74
75
76
def tick(self) -> int:
    """Local event: increment."""
    self.time += 1
    return self.time

send()

Prepare timestamp for sending.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
78
79
80
81
def send(self) -> int:
    """Prepare timestamp for sending."""
    self.time += 1
    return self.time

receive(remote_time)

Update on message receipt.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
83
84
85
86
def receive(self, remote_time: int) -> int:
    """Update on message receipt."""
    self.time = max(self.time, remote_time) + 1
    return self.time

VectorClock

Vector clock for full causal dependency tracking.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class VectorClock:
    """Vector clock for full causal dependency tracking."""

    def __init__(self, node_id: int, num_nodes: int):
        self.node_id = node_id
        self.num_nodes = num_nodes
        self.clock = np.zeros(num_nodes, dtype=np.int64)

    def tick(self) -> np.ndarray:
        self.clock[self.node_id] += 1
        return self.clock.copy()

    def send(self) -> np.ndarray:
        self.clock[self.node_id] += 1
        return self.clock.copy()

    def receive(self, remote_clock: np.ndarray) -> np.ndarray:
        self.clock = np.maximum(self.clock, remote_clock)
        self.clock[self.node_id] += 1
        return self.clock.copy()

    def happened_before(self, other: np.ndarray) -> bool:
        """Check if self happened-before other (self < other)."""
        return bool(np.all(self.clock <= other) and np.any(self.clock < other))

    def concurrent_with(self, other: np.ndarray) -> bool:
        """Check if self is concurrent with other."""
        return not self.happened_before(other) and not bool(
            np.all(other <= self.clock) and np.any(other < self.clock)
        )

happened_before(other)

Check if self happened-before other (self < other).

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
110
111
112
def happened_before(self, other: np.ndarray) -> bool:
    """Check if self happened-before other (self < other)."""
    return bool(np.all(self.clock <= other) and np.any(self.clock < other))

concurrent_with(other)

Check if self is concurrent with other.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
114
115
116
117
118
def concurrent_with(self, other: np.ndarray) -> bool:
    """Check if self is concurrent with other."""
    return not self.happened_before(other) and not bool(
        np.all(other <= self.clock) and np.any(other < self.clock)
    )

TwinEvent dataclass

One event in the time-warp simulation.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
133
134
135
136
137
138
139
140
141
142
143
144
145
@dataclass(order=True)
class TwinEvent:
    """One event in the time-warp simulation."""

    virtual_time_ns: int
    priority: int = field(compare=True, default=0)
    event_type: EventType = field(compare=False, default=EventType.SPIKE)
    source_node: int = field(compare=False, default=0)
    target_node: int = field(compare=False, default=0)
    payload: Dict[str, Any] = field(compare=False, default_factory=dict)
    lamport_ts: int = field(compare=False, default=0)
    vector_ts: Optional[np.ndarray] = field(compare=False, default=None)
    cancelled: bool = field(compare=False, default=False)

Checkpoint dataclass

Deterministic state snapshot for rollback.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@dataclass
class Checkpoint:
    """Deterministic state snapshot for rollback."""

    checkpoint_id: int
    virtual_time_ns: int
    node_id: int
    neuron_state: Optional[np.ndarray] = None
    synapse_state: Optional[np.ndarray] = None
    lfsr_state: int = 0
    identity_deep: float = 0.0
    lamport_time: int = 0
    vector_clock: Optional[np.ndarray] = None
    checksum: str = ""

    def compute_checksum(self) -> str:
        h = hashlib.sha256()
        h.update(self.checkpoint_id.to_bytes(4, "little"))
        h.update(self.virtual_time_ns.to_bytes(8, "little"))
        h.update(self.lfsr_state.to_bytes(4, "little"))
        if self.neuron_state is not None:
            h.update(self.neuron_state.tobytes())
        self.checksum = h.hexdigest()[:16]
        return self.checksum

CheckpointManager

Manages state snapshots for time-warp rollback.

Preserves the identity substrate (ArcaneNeuron.v_deep) across rollback: deep compartment is NEVER rolled back.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class CheckpointManager:
    """Manages state snapshots for time-warp rollback.

    Preserves the identity substrate (ArcaneNeuron.v_deep) across
    rollback: deep compartment is NEVER rolled back.
    """

    def __init__(self, max_checkpoints: int = 100):
        self.max_checkpoints = max_checkpoints
        self.checkpoints: Dict[int, List[Checkpoint]] = {}  # node_id → sorted list
        self._next_id: int = 0

    def save(
        self,
        node_id: int,
        virtual_time_ns: int,
        neuron_state: Optional[np.ndarray] = None,
        synapse_state: Optional[np.ndarray] = None,
        lfsr_state: int = 0,
        identity_deep: float = 0.0,
        lamport_time: int = 0,
        vector_clock: Optional[np.ndarray] = None,
    ) -> Checkpoint:
        cp = Checkpoint(
            checkpoint_id=self._next_id,
            virtual_time_ns=virtual_time_ns,
            node_id=node_id,
            neuron_state=neuron_state.copy() if neuron_state is not None else None,
            synapse_state=synapse_state.copy() if synapse_state is not None else None,
            lfsr_state=lfsr_state,
            identity_deep=identity_deep,
            lamport_time=lamport_time,
            vector_clock=vector_clock.copy() if vector_clock is not None else None,
        )
        cp.compute_checksum()
        self._next_id += 1

        if node_id not in self.checkpoints:
            self.checkpoints[node_id] = []
        self.checkpoints[node_id].append(cp)

        # Garbage collection: keep only latest N
        if len(self.checkpoints[node_id]) > self.max_checkpoints:
            self.checkpoints[node_id] = self.checkpoints[node_id][-self.max_checkpoints :]

        return cp

    def find_rollback_target(self, node_id: int, target_time_ns: int) -> Optional[Checkpoint]:
        """Find the latest checkpoint at or before target_time."""
        cps = self.checkpoints.get(node_id, [])
        best = None
        for cp in cps:
            if cp.virtual_time_ns <= target_time_ns:
                best = cp
        return best

    def discard_after(self, node_id: int, time_ns: int) -> int:
        """Discard checkpoints after a given time (post-rollback cleanup)."""
        cps = self.checkpoints.get(node_id, [])
        before = len(cps)
        self.checkpoints[node_id] = [cp for cp in cps if cp.virtual_time_ns <= time_ns]
        return before - len(self.checkpoints.get(node_id, []))

    @property
    def total_checkpoints(self) -> int:
        return sum(len(v) for v in self.checkpoints.values())

find_rollback_target(node_id, target_time_ns)

Find the latest checkpoint at or before target_time.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
224
225
226
227
228
229
230
231
def find_rollback_target(self, node_id: int, target_time_ns: int) -> Optional[Checkpoint]:
    """Find the latest checkpoint at or before target_time."""
    cps = self.checkpoints.get(node_id, [])
    best = None
    for cp in cps:
        if cp.virtual_time_ns <= target_time_ns:
            best = cp
    return best

discard_after(node_id, time_ns)

Discard checkpoints after a given time (post-rollback cleanup).

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
233
234
235
236
237
238
def discard_after(self, node_id: int, time_ns: int) -> int:
    """Discard checkpoints after a given time (post-rollback cleanup)."""
    cps = self.checkpoints.get(node_id, [])
    before = len(cps)
    self.checkpoints[node_id] = [cp for cp in cps if cp.virtual_time_ns <= time_ns]
    return before - len(self.checkpoints.get(node_id, []))

NodeState dataclass

Per-node state in the time-warp simulation.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
248
249
250
251
252
253
254
255
256
257
258
@dataclass
class NodeState:
    """Per-node state in the time-warp simulation."""

    node_id: int
    local_virtual_time_ns: int = 0
    lamport: LamportClock = field(default_factory=LamportClock)
    vector_clock: Optional[VectorClock] = None
    processed_events: int = 0
    rollback_count: int = 0
    identity_deep: float = 0.0  # Never rolled back

TimeWarpEngine

Optimistic parallel simulation with anti-message rollback.

Implements the Jefferson Time Warp protocol adapted for SC neuromorphic simulation:

  1. Each node advances optimistically at its own rate
  2. Straggler events trigger rollback + anti-messages
  3. Global Virtual Time (GVT) advances monotonically
  4. Fossil collection prunes checkpoints below GVT
  5. Identity (v_deep) is NEVER rolled back — it is the self
Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
class TimeWarpEngine:
    """Optimistic parallel simulation with anti-message rollback.

    Implements the Jefferson Time Warp protocol adapted for
    SC neuromorphic simulation:

    1. Each node advances optimistically at its own rate
    2. Straggler events trigger rollback + anti-messages
    3. Global Virtual Time (GVT) advances monotonically
    4. Fossil collection prunes checkpoints below GVT
    5. Identity (v_deep) is NEVER rolled back — it is the self
    """

    def __init__(self, num_nodes: int, checkpoint_interval_ns: int = 1000):
        self.num_nodes = num_nodes
        self.checkpoint_interval_ns = checkpoint_interval_ns
        self.nodes: Dict[int, NodeState] = {}
        for i in range(num_nodes):
            ns = NodeState(node_id=i)
            ns.vector_clock = VectorClock(i, num_nodes)
            self.nodes[i] = ns
        self.event_queue: List[TwinEvent] = []
        self.processed: List[TwinEvent] = []
        self.anti_messages: List[TwinEvent] = []
        self.checkpoint_mgr = CheckpointManager()
        self.gvt_ns: int = 0
        self.total_rollbacks: int = 0

    def inject_event(self, event: TwinEvent) -> None:
        """Inject an event into the simulation."""
        heapq.heappush(self.event_queue, event)

    def process_next(self) -> Optional[TwinEvent]:
        """Process the next event from the queue."""
        if not self.event_queue:
            return None

        event = heapq.heappop(self.event_queue)
        if event.cancelled:
            return event

        target = self.nodes.get(event.target_node)
        if target is None:
            return event

        # Check for straggler (causality violation)
        if event.virtual_time_ns < target.local_virtual_time_ns:
            self._rollback(target, event.virtual_time_ns)

        # Process event
        target.local_virtual_time_ns = event.virtual_time_ns
        target.lamport.receive(event.lamport_ts)
        if target.vector_clock is not None and event.vector_ts is not None:
            target.vector_clock.receive(event.vector_ts)
        target.processed_events += 1

        # Periodic checkpoint
        if target.processed_events % max(1, self.checkpoint_interval_ns) == 0:
            self.checkpoint_mgr.save(
                target.node_id,
                target.local_virtual_time_ns,
                lfsr_state=target.processed_events,
                identity_deep=target.identity_deep,
                lamport_time=target.lamport.time,
                vector_clock=target.vector_clock.clock if target.vector_clock else None,
            )

        self.processed.append(event)
        return event

    def _rollback(self, node: NodeState, target_time_ns: int) -> None:
        """Roll back a node to a checkpoint at or before target_time.

        Identity (v_deep) is preserved — never rolled back.
        """
        saved_identity = node.identity_deep

        cp = self.checkpoint_mgr.find_rollback_target(node.node_id, target_time_ns)
        if cp is not None:
            node.local_virtual_time_ns = cp.virtual_time_ns
            node.lamport.time = cp.lamport_time
            if node.vector_clock is not None and cp.vector_clock is not None:
                node.vector_clock.clock = cp.vector_clock.copy()
            self.checkpoint_mgr.discard_after(node.node_id, cp.virtual_time_ns)
        else:
            node.local_virtual_time_ns = target_time_ns

        # Restore identity
        node.identity_deep = saved_identity
        node.rollback_count += 1
        self.total_rollbacks += 1

        # Generate anti-messages for events processed after rollback point
        anti = [
            TwinEvent(
                virtual_time_ns=e.virtual_time_ns,
                event_type=EventType.ANTI_MESSAGE,
                source_node=node.node_id,
                target_node=e.target_node,
                lamport_ts=node.lamport.send(),
            )
            for e in self.processed
            if e.source_node == node.node_id and e.virtual_time_ns > target_time_ns
        ]
        self.anti_messages.extend(anti)
        for a in anti:
            heapq.heappush(self.event_queue, a)

    def compute_gvt(self) -> int:
        """Compute Global Virtual Time (minimum of all LVTs + in-transit)."""
        lvts = [n.local_virtual_time_ns for n in self.nodes.values()]
        in_transit = [e.virtual_time_ns for e in self.event_queue if not e.cancelled]
        all_times = lvts + in_transit
        self.gvt_ns = min(all_times) if all_times else 0
        return self.gvt_ns

    def fossil_collect(self) -> int:
        """Remove checkpoints below GVT."""
        gvt = self.compute_gvt()
        removed = 0
        for nid in list(self.checkpoint_mgr.checkpoints.keys()):
            cps = self.checkpoint_mgr.checkpoints[nid]
            before = len(cps)
            self.checkpoint_mgr.checkpoints[nid] = [cp for cp in cps if cp.virtual_time_ns >= gvt]
            removed += before - len(self.checkpoint_mgr.checkpoints[nid])
        return removed

    def status(self) -> Dict[str, Any]:
        return {
            "num_nodes": self.num_nodes,
            "gvt_ns": self.gvt_ns,
            "total_rollbacks": self.total_rollbacks,
            "pending_events": len(self.event_queue),
            "processed_events": len(self.processed),
            "checkpoints": self.checkpoint_mgr.total_checkpoints,
            "node_lvts": {nid: n.local_virtual_time_ns for nid, n in self.nodes.items()},
        }

    def inject_sync_barrier(self, virtual_time_ns: int) -> None:
        """Inject a sync barrier event to all nodes at given time."""
        for nid in self.nodes:
            event = TwinEvent(
                virtual_time_ns=virtual_time_ns,
                event_type=EventType.SYNC_BARRIER,
                source_node=-1,
                target_node=nid,
                lamport_ts=0,
            )
            heapq.heappush(self.event_queue, event)

    def verify_causal_order(self) -> List[Tuple[int, int]]:
        """Verify causal ordering of processed events.

        Returns list of (index_a, index_b) pairs where order is violated.
        """
        violations = []
        for i in range(len(self.processed) - 1):
            a = self.processed[i]
            b = self.processed[i + 1]
            if a.target_node == b.target_node and a.virtual_time_ns > b.virtual_time_ns:
                violations.append((i, i + 1))
        return violations

    def detect_starvation(self, threshold_ns: int = 10000) -> List[int]:
        """Detect nodes lagging behind GVT by more than threshold."""
        gvt = self.compute_gvt()
        return [
            nid for nid, n in self.nodes.items() if gvt - n.local_virtual_time_ns > threshold_ns
        ]

    def node_throughput(self) -> Dict[int, int]:
        """Events processed per node."""
        return {nid: n.processed_events for nid, n in self.nodes.items()}

inject_event(event)

Inject an event into the simulation.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
289
290
291
def inject_event(self, event: TwinEvent) -> None:
    """Inject an event into the simulation."""
    heapq.heappush(self.event_queue, event)

process_next()

Process the next event from the queue.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def process_next(self) -> Optional[TwinEvent]:
    """Process the next event from the queue."""
    if not self.event_queue:
        return None

    event = heapq.heappop(self.event_queue)
    if event.cancelled:
        return event

    target = self.nodes.get(event.target_node)
    if target is None:
        return event

    # Check for straggler (causality violation)
    if event.virtual_time_ns < target.local_virtual_time_ns:
        self._rollback(target, event.virtual_time_ns)

    # Process event
    target.local_virtual_time_ns = event.virtual_time_ns
    target.lamport.receive(event.lamport_ts)
    if target.vector_clock is not None and event.vector_ts is not None:
        target.vector_clock.receive(event.vector_ts)
    target.processed_events += 1

    # Periodic checkpoint
    if target.processed_events % max(1, self.checkpoint_interval_ns) == 0:
        self.checkpoint_mgr.save(
            target.node_id,
            target.local_virtual_time_ns,
            lfsr_state=target.processed_events,
            identity_deep=target.identity_deep,
            lamport_time=target.lamport.time,
            vector_clock=target.vector_clock.clock if target.vector_clock else None,
        )

    self.processed.append(event)
    return event

compute_gvt()

Compute Global Virtual Time (minimum of all LVTs + in-transit).

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
369
370
371
372
373
374
375
def compute_gvt(self) -> int:
    """Compute Global Virtual Time (minimum of all LVTs + in-transit)."""
    lvts = [n.local_virtual_time_ns for n in self.nodes.values()]
    in_transit = [e.virtual_time_ns for e in self.event_queue if not e.cancelled]
    all_times = lvts + in_transit
    self.gvt_ns = min(all_times) if all_times else 0
    return self.gvt_ns

fossil_collect()

Remove checkpoints below GVT.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
377
378
379
380
381
382
383
384
385
386
def fossil_collect(self) -> int:
    """Remove checkpoints below GVT."""
    gvt = self.compute_gvt()
    removed = 0
    for nid in list(self.checkpoint_mgr.checkpoints.keys()):
        cps = self.checkpoint_mgr.checkpoints[nid]
        before = len(cps)
        self.checkpoint_mgr.checkpoints[nid] = [cp for cp in cps if cp.virtual_time_ns >= gvt]
        removed += before - len(self.checkpoint_mgr.checkpoints[nid])
    return removed

inject_sync_barrier(virtual_time_ns)

Inject a sync barrier event to all nodes at given time.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
399
400
401
402
403
404
405
406
407
408
409
def inject_sync_barrier(self, virtual_time_ns: int) -> None:
    """Inject a sync barrier event to all nodes at given time."""
    for nid in self.nodes:
        event = TwinEvent(
            virtual_time_ns=virtual_time_ns,
            event_type=EventType.SYNC_BARRIER,
            source_node=-1,
            target_node=nid,
            lamport_ts=0,
        )
        heapq.heappush(self.event_queue, event)

verify_causal_order()

Verify causal ordering of processed events.

Returns list of (index_a, index_b) pairs where order is violated.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
411
412
413
414
415
416
417
418
419
420
421
422
def verify_causal_order(self) -> List[Tuple[int, int]]:
    """Verify causal ordering of processed events.

    Returns list of (index_a, index_b) pairs where order is violated.
    """
    violations = []
    for i in range(len(self.processed) - 1):
        a = self.processed[i]
        b = self.processed[i + 1]
        if a.target_node == b.target_node and a.virtual_time_ns > b.virtual_time_ns:
            violations.append((i, i + 1))
    return violations

detect_starvation(threshold_ns=10000)

Detect nodes lagging behind GVT by more than threshold.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
424
425
426
427
428
429
def detect_starvation(self, threshold_ns: int = 10000) -> List[int]:
    """Detect nodes lagging behind GVT by more than threshold."""
    gvt = self.compute_gvt()
    return [
        nid for nid, n in self.nodes.items() if gvt - n.local_virtual_time_ns > threshold_ns
    ]

node_throughput()

Events processed per node.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
431
432
433
def node_throughput(self) -> Dict[int, int]:
    """Events processed per node."""
    return {nid: n.processed_events for nid, n in self.nodes.items()}

DivergenceMetric dataclass

Measures divergence between physical and digital twin.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
@dataclass
class DivergenceMetric:
    """Measures divergence between physical and digital twin."""

    spike_rate_divergence: float = 0.0
    timing_offset_ns: int = 0
    identity_drift: float = 0.0
    causal_violations: int = 0

    @property
    def total_divergence(self) -> float:
        return (
            self.spike_rate_divergence
            + abs(self.timing_offset_ns) / 1e6
            + self.identity_drift
            + self.causal_violations * 0.1
        )

    @property
    def within_tolerance(self) -> bool:
        return self.total_divergence < 1.0

TwinSession

Orchestrates physical ↔ digital twin synchronization.

Manages bidirectional data flow: - Physical → Digital: sensor events (MEA spikes, EEG) - Digital → Physical: stimulation commands (opto, TMS)

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
class TwinSession:
    """Orchestrates physical ↔ digital twin synchronization.

    Manages bidirectional data flow:
    - Physical → Digital: sensor events (MEA spikes, EEG)
    - Digital → Physical: stimulation commands (opto, TMS)
    """

    def __init__(
        self,
        num_nodes: int,
        mode: SyncMode = SyncMode.OPTIMISTIC,
        max_divergence: float = 1.0,
    ):
        self.num_nodes = num_nodes
        self.mode = mode
        self.max_divergence = max_divergence
        self.engine = TimeWarpEngine(num_nodes)
        self.divergence = DivergenceMetric()
        self.physical_events_in: int = 0
        self.digital_events_out: int = 0
        self.session_time_ns: int = 0
        self.running: bool = False

    def start(self) -> None:
        self.running = True

    def stop(self) -> None:
        self.running = False

    def inject_physical_event(
        self, spike_time_ns: int, neuron_id: int, target_node: int = 0
    ) -> None:
        """Inject a physical sensor event into the digital twin."""
        event = TwinEvent(
            virtual_time_ns=spike_time_ns,
            event_type=EventType.SENSOR_INPUT,
            source_node=-1,  # physical world
            target_node=target_node,
            payload={"neuron_id": neuron_id},
            lamport_ts=0,
        )
        self.engine.inject_event(event)
        self.physical_events_in += 1

    def advance(self, steps: int = 1) -> int:
        """Advance the simulation by N steps."""
        processed = 0
        for _ in range(steps):
            ev = self.engine.process_next()
            if ev is None:
                break
            processed += 1
            self.session_time_ns = max(self.session_time_ns, ev.virtual_time_ns)
        return processed

    def update_divergence(
        self,
        physical_rate_hz: float,
        digital_rate_hz: float,
        physical_identity: float,
    ) -> DivergenceMetric:
        """Update divergence metrics."""
        digital_identity = 0.0
        if self.engine.nodes:
            digital_identity = list(self.engine.nodes.values())[0].identity_deep

        self.divergence = DivergenceMetric(
            spike_rate_divergence=abs(physical_rate_hz - digital_rate_hz)
            / max(physical_rate_hz, 1.0),
            timing_offset_ns=self.session_time_ns - self.engine.gvt_ns,
            identity_drift=abs(physical_identity - digital_identity),
            causal_violations=self.engine.total_rollbacks,
        )
        return self.divergence

    def status(self) -> Dict[str, Any]:
        return {
            "running": self.running,
            "mode": self.mode.value,
            "session_time_ns": self.session_time_ns,
            "physical_events": self.physical_events_in,
            "digital_events": self.digital_events_out,
            "divergence": self.divergence.total_divergence,
            "within_tolerance": self.divergence.within_tolerance,
            "engine": self.engine.status(),
        }

inject_physical_event(spike_time_ns, neuron_id, target_node=0)

Inject a physical sensor event into the digital twin.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def inject_physical_event(
    self, spike_time_ns: int, neuron_id: int, target_node: int = 0
) -> None:
    """Inject a physical sensor event into the digital twin."""
    event = TwinEvent(
        virtual_time_ns=spike_time_ns,
        event_type=EventType.SENSOR_INPUT,
        source_node=-1,  # physical world
        target_node=target_node,
        payload={"neuron_id": neuron_id},
        lamport_ts=0,
    )
    self.engine.inject_event(event)
    self.physical_events_in += 1

advance(steps=1)

Advance the simulation by N steps.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
513
514
515
516
517
518
519
520
521
522
def advance(self, steps: int = 1) -> int:
    """Advance the simulation by N steps."""
    processed = 0
    for _ in range(steps):
        ev = self.engine.process_next()
        if ev is None:
            break
        processed += 1
        self.session_time_ns = max(self.session_time_ns, ev.virtual_time_ns)
    return processed

update_divergence(physical_rate_hz, digital_rate_hz, physical_identity)

Update divergence metrics.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
def update_divergence(
    self,
    physical_rate_hz: float,
    digital_rate_hz: float,
    physical_identity: float,
) -> DivergenceMetric:
    """Update divergence metrics."""
    digital_identity = 0.0
    if self.engine.nodes:
        digital_identity = list(self.engine.nodes.values())[0].identity_deep

    self.divergence = DivergenceMetric(
        spike_rate_divergence=abs(physical_rate_hz - digital_rate_hz)
        / max(physical_rate_hz, 1.0),
        timing_offset_ns=self.session_time_ns - self.engine.gvt_ns,
        identity_drift=abs(physical_identity - digital_identity),
        causal_violations=self.engine.total_rollbacks,
    )
    return self.divergence

LookaheadConfig dataclass

Null-message lookahead for conservative synchronization.

Each node declares a minimum time advance (lookahead) it guarantees before generating output events. Peers can safely advance by at least this amount without rollback risk.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
@dataclass
class LookaheadConfig:
    """Null-message lookahead for conservative synchronization.

    Each node declares a minimum time advance (lookahead) it guarantees
    before generating output events. Peers can safely advance by at
    least this amount without rollback risk.
    """

    node_id: int
    lookahead_ns: int = 1000
    last_null_message_ns: int = 0

    def can_advance_to(self, target_ns: int) -> bool:
        return target_ns <= self.last_null_message_ns + self.lookahead_ns

    def send_null_message(self, current_ns: int) -> int:
        self.last_null_message_ns = current_ns
        return current_ns + self.lookahead_ns

NullMessageOptimizer

Reduces rollbacks in mixed conservative/optimistic mode.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
class NullMessageOptimizer:
    """Reduces rollbacks in mixed conservative/optimistic mode."""

    def __init__(self, num_nodes: int, default_lookahead_ns: int = 1000):
        self.configs = {i: LookaheadConfig(i, default_lookahead_ns) for i in range(num_nodes)}

    def safe_advance_time(self, node_id: int) -> int:
        """Maximum time this node can safely advance to."""
        peers = [c for nid, c in self.configs.items() if nid != node_id]
        if not peers:
            return self.configs[node_id].last_null_message_ns + self.configs[node_id].lookahead_ns
        return min(c.last_null_message_ns + c.lookahead_ns for c in peers)

    def broadcast_null(self, node_id: int, current_ns: int) -> None:
        self.configs[node_id].send_null_message(current_ns)

safe_advance_time(node_id)

Maximum time this node can safely advance to.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
587
588
589
590
591
592
def safe_advance_time(self, node_id: int) -> int:
    """Maximum time this node can safely advance to."""
    peers = [c for nid, c in self.configs.items() if nid != node_id]
    if not peers:
        return self.configs[node_id].last_null_message_ns + self.configs[node_id].lookahead_ns
    return min(c.last_null_message_ns + c.lookahead_ns for c in peers)

DeltaCheckpoint dataclass

Stores only the diff from a base checkpoint.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
@dataclass
class DeltaCheckpoint:
    """Stores only the diff from a base checkpoint."""

    base_checkpoint_id: int
    checkpoint_id: int
    virtual_time_ns: int
    node_id: int
    changed_indices: np.ndarray  # indices that changed
    changed_values: np.ndarray  # new values at those indices
    lfsr_delta: int = 0
    size_bytes: int = 0

    @staticmethod
    def compute_delta(
        base_state: np.ndarray,
        new_state: np.ndarray,
        base_id: int,
        new_id: int,
        virtual_time_ns: int,
        node_id: int,
    ) -> DeltaCheckpoint:
        diff_mask = base_state != new_state
        indices = np.where(diff_mask)[0]
        values = new_state[indices]
        return DeltaCheckpoint(
            base_checkpoint_id=base_id,
            checkpoint_id=new_id,
            virtual_time_ns=virtual_time_ns,
            node_id=node_id,
            changed_indices=indices,
            changed_values=values,
            size_bytes=indices.nbytes + values.nbytes,
        )

    @property
    def compression_ratio(self) -> float:
        if self.size_bytes <= 0:
            return 0.0
        return 1.0  # actual ratio requires full state size context

    @property
    def num_changes(self) -> int:
        return len(self.changed_indices)

ReplayVerifier

Verifies bitstream-exact replay across runs.

Compares checkpoint hashes from two runs to prove determinism.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
class ReplayVerifier:
    """Verifies bitstream-exact replay across runs.

    Compares checkpoint hashes from two runs to prove determinism.
    """

    def __init__(self):
        self.run_a_hashes: List[str] = []
        self.run_b_hashes: List[str] = []

    def record_run_a(self, checkpoint: Checkpoint) -> None:
        self.run_a_hashes.append(checkpoint.checksum)

    def record_run_b(self, checkpoint: Checkpoint) -> None:
        self.run_b_hashes.append(checkpoint.checksum)

    @property
    def is_deterministic(self) -> bool:
        if not self.run_a_hashes or not self.run_b_hashes:
            return False
        min_len = min(len(self.run_a_hashes), len(self.run_b_hashes))
        return self.run_a_hashes[:min_len] == self.run_b_hashes[:min_len]

    @property
    def first_divergence_index(self) -> Optional[int]:
        min_len = min(len(self.run_a_hashes), len(self.run_b_hashes))
        for i in range(min_len):
            if self.run_a_hashes[i] != self.run_b_hashes[i]:
                return i
        return None

    @property
    def compared_count(self) -> int:
        return min(len(self.run_a_hashes), len(self.run_b_hashes))

DriftCorrection dataclass

One drift correction action.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
689
690
691
692
693
694
695
696
@dataclass
class DriftCorrection:
    """One drift correction action."""

    correction_ns: int
    applied_at_ns: int
    node_id: int
    reason: str

DriftAutoCorrector

Closed-loop drift correction between physical and digital twin.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
class DriftAutoCorrector:
    """Closed-loop drift correction between physical and digital twin."""

    def __init__(self, max_drift_ns: int = 5000, correction_gain: float = 0.5):
        self.max_drift_ns = max_drift_ns
        self.correction_gain = correction_gain
        self.corrections: List[DriftCorrection] = []

    def check_and_correct(
        self,
        physical_time_ns: int,
        digital_time_ns: int,
        node_id: int = 0,
    ) -> Optional[DriftCorrection]:
        drift = physical_time_ns - digital_time_ns
        if abs(drift) <= self.max_drift_ns:
            return None
        correction = int(drift * self.correction_gain)
        dc = DriftCorrection(correction, digital_time_ns, node_id, f"drift={drift}ns")
        self.corrections.append(dc)
        return dc

    @property
    def total_corrections(self) -> int:
        return len(self.corrections)

MPIRankMapping dataclass

Maps MPI ranks to physical node topology.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
729
730
731
732
733
734
735
736
737
738
739
740
741
@dataclass
class MPIRankMapping:
    """Maps MPI ranks to physical node topology."""

    rank: int
    hostname: str = ""
    gpu_id: int = -1
    numa_node: int = 0
    neuron_range: Tuple[int, int] = (0, 0)

    @property
    def neuron_count(self) -> int:
        return self.neuron_range[1] - self.neuron_range[0]

MPITopology

Physical→logical node layout for distributed twin.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
class MPITopology:
    """Physical→logical node layout for distributed twin."""

    def __init__(self):
        self.ranks: Dict[int, MPIRankMapping] = {}

    def add_rank(self, mapping: MPIRankMapping) -> None:
        self.ranks[mapping.rank] = mapping

    @property
    def total_neurons(self) -> int:
        return sum(r.neuron_count for r in self.ranks.values())

    @property
    def num_ranks(self) -> int:
        return len(self.ranks)

    def rank_for_neuron(self, neuron_id: int) -> Optional[int]:
        for rank, m in self.ranks.items():
            if m.neuron_range[0] <= neuron_id < m.neuron_range[1]:
                return rank
        return None

    def co_located_ranks(self, rank: int) -> List[int]:
        """Ranks on the same host (cheap communication)."""
        target = self.ranks.get(rank)
        if target is None:
            return []
        return [r for r, m in self.ranks.items() if m.hostname == target.hostname and r != rank]

co_located_ranks(rank)

Ranks on the same host (cheap communication).

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
767
768
769
770
771
772
def co_located_ranks(self, rank: int) -> List[int]:
    """Ranks on the same host (cheap communication)."""
    target = self.ranks.get(rank)
    if target is None:
        return []
    return [r for r, m in self.ranks.items() if m.hostname == target.hostname and r != rank]

BackpressureController

Prevents event overload by throttling injection rate.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
class BackpressureController:
    """Prevents event overload by throttling injection rate."""

    def __init__(self, max_queue_depth: int = 10000, cooldown_ns: int = 100):
        self.max_queue_depth = max_queue_depth
        self.cooldown_ns = cooldown_ns
        self.rejected_count: int = 0
        self.total_offered: int = 0

    def should_accept(self, current_queue_depth: int) -> bool:
        self.total_offered += 1
        if current_queue_depth >= self.max_queue_depth:
            self.rejected_count += 1
            return False
        return True

    @property
    def rejection_rate(self) -> float:
        if self.total_offered <= 0:
            return 0.0
        return self.rejected_count / self.total_offered

    @property
    def is_backpressured(self) -> bool:
        return self.rejection_rate > 0.1

CheckpointAuditChain

Tamper-evident chain of checkpoint hashes.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
class CheckpointAuditChain:
    """Tamper-evident chain of checkpoint hashes."""

    def __init__(self):
        self.chain: List[Tuple[int, str, str]] = []  # (cp_id, cp_hash, chain_hash)

    def append(self, checkpoint: Checkpoint) -> str:
        prev_hash = self.chain[-1][2] if self.chain else "0" * 16
        h = hashlib.sha256()
        h.update(prev_hash.encode())
        h.update(checkpoint.checksum.encode())
        chain_hash = h.hexdigest()[:16]
        self.chain.append((checkpoint.checkpoint_id, checkpoint.checksum, chain_hash))
        return chain_hash

    def verify(self) -> bool:
        prev = "0" * 16
        for cp_id, cp_hash, stored_chain_hash in self.chain:
            h = hashlib.sha256()
            h.update(prev.encode())
            h.update(cp_hash.encode())
            expected = h.hexdigest()[:16]
            if expected != stored_chain_hash:
                return False
            prev = stored_chain_hash
        return True

    @property
    def length(self) -> int:
        return len(self.chain)

SessionSnapshot dataclass

Serializable session state for persistence.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
@dataclass
class SessionSnapshot:
    """Serializable session state for persistence."""

    session_time_ns: int
    num_nodes: int
    mode: str
    physical_events_in: int
    digital_events_out: int
    gvt_ns: int
    total_rollbacks: int
    node_lvts: Dict[int, int]
    checkpoint_count: int

    @staticmethod
    def from_session(session: TwinSession) -> SessionSnapshot:
        eng = session.engine
        return SessionSnapshot(
            session_time_ns=session.session_time_ns,
            num_nodes=session.num_nodes,
            mode=session.mode.value,
            physical_events_in=session.physical_events_in,
            digital_events_out=session.digital_events_out,
            gvt_ns=eng.gvt_ns,
            total_rollbacks=eng.total_rollbacks,
            node_lvts={nid: n.local_virtual_time_ns for nid, n in eng.nodes.items()},
            checkpoint_count=eng.checkpoint_mgr.total_checkpoints,
        )

    def to_dict(self) -> Dict[str, Any]:
        return {
            "session_time_ns": self.session_time_ns,
            "num_nodes": self.num_nodes,
            "mode": self.mode,
            "physical_events_in": self.physical_events_in,
            "digital_events_out": self.digital_events_out,
            "gvt_ns": self.gvt_ns,
            "total_rollbacks": self.total_rollbacks,
            "node_lvts": self.node_lvts,
            "checkpoint_count": self.checkpoint_count,
        }

TwinEndpoint dataclass

One twin in a federation.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
889
890
891
892
893
894
895
@dataclass
class TwinEndpoint:
    """One twin in a federation."""

    twin_id: str
    session: TwinSession
    priority: int = 0

TwinFederation

Federates multiple digital twins for multi-subject studies.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
class TwinFederation:
    """Federates multiple digital twins for multi-subject studies."""

    def __init__(self):
        self.twins: Dict[str, TwinEndpoint] = {}

    def register(self, twin_id: str, session: TwinSession, priority: int = 0) -> None:
        self.twins[twin_id] = TwinEndpoint(twin_id, session, priority)

    @property
    def twin_count(self) -> int:
        return len(self.twins)

    def global_gvt(self) -> int:
        if not self.twins:
            return 0
        return min(t.session.engine.gvt_ns for t in self.twins.values())

    def advance_all(self, steps: int = 1) -> Dict[str, int]:
        return {tid: t.session.advance(steps) for tid, t in self.twins.items()}

    def total_divergence(self) -> float:
        if not self.twins:
            return 0.0
        return sum(t.session.divergence.total_divergence for t in self.twins.values())

AdaptiveCheckpointInterval

Dynamically adjusts checkpoint frequency based on rollback rate.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
class AdaptiveCheckpointInterval:
    """Dynamically adjusts checkpoint frequency based on rollback rate."""

    def __init__(
        self, base_interval: int = 1000, min_interval: int = 100, max_interval: int = 10000
    ):
        self.base_interval = base_interval
        self.min_interval = min_interval
        self.max_interval = max_interval
        self.current_interval = base_interval
        self._last_rollbacks: int = 0

    def update(self, total_rollbacks: int, total_events: int) -> int:
        """Adjust interval: more rollbacks → more frequent checkpoints."""
        new_rollbacks = total_rollbacks - self._last_rollbacks
        self._last_rollbacks = total_rollbacks

        if total_events <= 0:
            return self.current_interval

        rollback_rate = new_rollbacks / max(1, total_events)
        if rollback_rate > 0.05:
            self.current_interval = max(self.min_interval, self.current_interval // 2)
        elif rollback_rate < 0.01:
            self.current_interval = min(self.max_interval, self.current_interval * 2)

        return self.current_interval

    @property
    def is_aggressive(self) -> bool:
        return self.current_interval <= self.min_interval * 2

update(total_rollbacks, total_events)

Adjust interval: more rollbacks → more frequent checkpoints.

Source code in src/sc_neurocore/digital_twin/twinsync.py
Python
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
def update(self, total_rollbacks: int, total_events: int) -> int:
    """Adjust interval: more rollbacks → more frequent checkpoints."""
    new_rollbacks = total_rollbacks - self._last_rollbacks
    self._last_rollbacks = total_rollbacks

    if total_events <= 0:
        return self.current_interval

    rollback_rate = new_rollbacks / max(1, total_events)
    if rollback_rate > 0.05:
        self.current_interval = max(self.min_interval, self.current_interval // 2)
    elif rollback_rate < 0.01:
        self.current_interval = min(self.max_interval, self.current_interval * 2)

    return self.current_interval