# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Neuro-Symbolic Logic Compiler
"""
Packet A — Stochastic Petri Net structure definition.
Pure Python + numpy + scipy. No sc_neurocore dependency.
Builds two sparse matrices that encode the net topology:
W_in : (n_transitions, n_places) — input arc weights
W_out : (n_places, n_transitions) — output arc weights
"""
from __future__ import annotations
from enum import Enum, auto
from typing import Dict, List, Tuple
import numpy as np
from numpy.typing import NDArray
from scipy import sparse # noqa: F401
FloatArray = NDArray[np.float64]
class _NodeKind(Enum):
PLACE = auto()
TRANSITION = auto()
[docs]
class StochasticPetriNet:
"""Stochastic Petri Net with sparse matrix representation.
Usage::
net = StochasticPetriNet()
net.add_place("P_red", initial_tokens=1.0)
net.add_place("P_green", initial_tokens=0.0)
net.add_transition("T_r2g", threshold=0.5)
net.add_arc("P_red", "T_r2g", weight=1.0)
net.add_arc("T_r2g", "P_green", weight=1.0)
net.compile()
"""
def __init__(self) -> None:
# Ordered registries -------------------------------------------------
self._places: List[str] = []
self._place_tokens: List[float] = []
self._place_idx: Dict[str, int] = {}
self._transitions: List[str] = []
self._transition_thresholds: List[float] = []
self._transition_delays: List[int] = []
self._transition_idx: Dict[str, int] = {}
# Node kind lookup (for arc validation) --------------------------------
self._kind: Dict[str, _NodeKind] = {}
# Arc storage (resolved at compile time) --------------------------------
# Each arc: (source_name, target_name, weight, inhibitor_flag)
self._arcs: List[Tuple[str, str, float, bool]] = []
# Compiled products ----------------------------------------------------
self.W_in: sparse.csr_matrix | None = None # (nT, nP)
self.W_out: sparse.csr_matrix | None = None # (nP, nT)
self._compiled: bool = False
self._last_validation_report: Dict[str, object] | None = None
# ── Builder API ──────────────────────────────────────────────────────────
[docs]
def add_place(self, name: str, initial_tokens: float = 0.0) -> None:
"""Add a place (state variable) with token density in [0, 1]."""
if name in self._kind:
raise ValueError(f"Node '{name}' already exists.")
if not 0.0 <= initial_tokens <= 1.0:
raise ValueError(f"initial_tokens must be in [0, 1], got {initial_tokens}")
idx = len(self._places)
self._places.append(name)
self._place_tokens.append(initial_tokens)
self._place_idx[name] = idx
self._kind[name] = _NodeKind.PLACE
self._compiled = False
[docs]
def add_transition(self, name: str, threshold: float = 0.5, delay_ticks: int = 0) -> None:
"""Add a transition (logic gate) with a firing threshold."""
if name in self._kind:
raise ValueError(f"Node '{name}' already exists.")
if threshold < 0.0:
raise ValueError(f"threshold must be >= 0, got {threshold}")
if delay_ticks < 0:
raise ValueError(f"delay_ticks must be >= 0, got {delay_ticks}")
idx = len(self._transitions)
self._transitions.append(name)
self._transition_thresholds.append(threshold)
self._transition_delays.append(int(delay_ticks))
self._transition_idx[name] = idx
self._kind[name] = _NodeKind.TRANSITION
self._compiled = False
[docs]
def add_arc(
self,
source: str,
target: str,
weight: float = 1.0,
inhibitor: bool = False,
) -> None:
"""Add a directed arc between a Place and a Transition (either direction).
Valid arcs:
Place -> Transition (input arc, stored in W_in)
Transition -> Place (output arc, stored in W_out)
Parameters
----------
inhibitor : bool, default False
If True, arc is encoded as a negative weight inhibitor input arc.
Only valid for ``Place -> Transition`` edges.
Raises ``ValueError`` for same-kind connections or unknown nodes.
"""
if source not in self._kind:
raise ValueError(f"Unknown node '{source}'.")
if target not in self._kind:
raise ValueError(f"Unknown node '{target}'.")
src_kind = self._kind[source]
tgt_kind = self._kind[target]
if src_kind == tgt_kind:
raise ValueError(
f"Arc must connect Place<->Transition, got "
f"{src_kind.name}->{tgt_kind.name} ('{source}'->'{target}')."
)
if inhibitor:
if not (src_kind is _NodeKind.PLACE and tgt_kind is _NodeKind.TRANSITION):
raise ValueError("inhibitor arcs are only supported for Place->Transition.")
if weight <= 0.0:
raise ValueError(f"inhibitor arc weight must be > 0 (magnitude), got {weight}")
stored_weight = -abs(float(weight))
else:
if weight <= 0.0:
raise ValueError(f"weight must be > 0, got {weight}")
stored_weight = float(weight)
self._arcs.append((source, target, stored_weight, bool(inhibitor)))
self._compiled = False
# ── Compile ──────────────────────────────────────────────────────────────
[docs]
def compile(
self,
validate_topology: bool = False,
strict_validation: bool = False,
allow_inhibitor: bool = False,
) -> None:
"""Build sparse W_in and W_out matrices from the arc list.
Parameters
----------
validate_topology : bool, default False
Compute and store topology diagnostics in
:attr:`last_validation_report`.
strict_validation : bool, default False
If True, raise ``ValueError`` when topology diagnostics contain
dead nodes or unseeded place cycles.
allow_inhibitor : bool, default False
Allow negative ``Place -> Transition`` weights for inhibitor arcs.
If False, compile fails when inhibitor arcs are present.
"""
nP = len(self._places)
nT = len(self._transitions)
if nP == 0 or nT == 0:
raise ValueError("Net must have at least one place and one transition.")
self._last_validation_report = None
if validate_topology or strict_validation:
report = self.validate_topology()
self._last_validation_report = report
has_issues = bool(
report["dead_places"]
or report["dead_transitions"]
or report["unseeded_place_cycles"]
or report["input_weight_overflow_transitions"]
)
if strict_validation and has_issues:
issues: List[str] = []
if report["dead_places"]:
issues.append(f"dead_places={report['dead_places']}")
if report["dead_transitions"]:
issues.append(f"dead_transitions={report['dead_transitions']}")
if report["unseeded_place_cycles"]:
issues.append(f"unseeded_place_cycles={report['unseeded_place_cycles']}")
if report["input_weight_overflow_transitions"]:
issues.append(
"input_weight_overflow_transitions="
f"{report['input_weight_overflow_transitions']}"
)
raise ValueError("Topology validation failed: " + "; ".join(issues))
# COO accumulators
in_rows: List[int] = []
in_cols: List[int] = []
in_vals: List[float] = []
out_rows: List[int] = []
out_cols: List[int] = []
out_vals: List[float] = []
for src, tgt, w, is_inhibitor in self._arcs:
if self._kind[src] is _NodeKind.PLACE:
if w < 0.0 and not allow_inhibitor:
raise ValueError(
"Negative input arc weights detected; "
"re-run compile with allow_inhibitor=True."
)
# Place -> Transition => W_in[transition_idx, place_idx]
t_idx = self._transition_idx[tgt]
p_idx = self._place_idx[src]
in_rows.append(t_idx)
in_cols.append(p_idx)
in_vals.append(w)
else:
if is_inhibitor or w < 0.0:
raise ValueError("Inhibitor arcs are only valid on Place->Transition edges.")
# Transition -> Place => W_out[place_idx, transition_idx]
t_idx = self._transition_idx[src]
p_idx = self._place_idx[tgt]
out_rows.append(p_idx)
out_cols.append(t_idx)
out_vals.append(w)
self.W_in = sparse.csr_matrix(
(in_vals, (in_rows, in_cols)), shape=(nT, nP), dtype=np.float64
)
self.W_out = sparse.csr_matrix(
(out_vals, (out_rows, out_cols)), shape=(nP, nT), dtype=np.float64
)
self._compiled = True
[docs]
def validate_topology(self) -> Dict[str, object]:
"""Return topology diagnostics without mutating compiled matrices.
Diagnostics:
- ``dead_places``: places with zero total degree.
- ``dead_transitions``: transitions with zero total degree.
- ``unseeded_place_cycles``: place-level SCC cycles with no initial
token mass.
- ``input_weight_overflow_transitions``: transitions whose positive
input arc-weight sum exceeds 1.0.
"""
place_in_deg = {p: 0 for p in self._places}
place_out_deg = {p: 0 for p in self._places}
trans_in_deg = {t: 0 for t in self._transitions}
trans_out_deg = {t: 0 for t in self._transitions}
transition_positive_input_weight_sum = {t: 0.0 for t in self._transitions}
transition_inputs: Dict[str, List[str]] = {t: [] for t in self._transitions}
transition_outputs: Dict[str, List[str]] = {t: [] for t in self._transitions}
for src, tgt, w, _is_inhibitor in self._arcs:
if self._kind[src] is _NodeKind.PLACE:
place_out_deg[src] += 1
trans_in_deg[tgt] += 1
transition_inputs[tgt].append(src)
if w > 0.0:
transition_positive_input_weight_sum[tgt] += float(w)
else:
trans_out_deg[src] += 1
place_in_deg[tgt] += 1
transition_outputs[src].append(tgt)
dead_places = sorted(p for p in self._places if (place_in_deg[p] + place_out_deg[p]) == 0)
dead_transitions = sorted(
t for t in self._transitions if (trans_in_deg[t] + trans_out_deg[t]) == 0
)
place_adj: Dict[str, set[str]] = {p: set() for p in self._places}
for t in self._transitions:
inputs = transition_inputs[t]
outputs = transition_outputs[t]
for src_place in inputs:
for dst_place in outputs:
place_adj[src_place].add(dst_place)
unseeded_place_cycles: List[List[str]] = []
for comp in self._strongly_connected_components(place_adj):
has_cycle = len(comp) > 1
if not has_cycle:
node = comp[0]
has_cycle = node in place_adj[node]
if not has_cycle:
continue
if all(self._place_tokens[self._place_idx[p]] <= 0.0 for p in comp):
unseeded_place_cycles.append(sorted(comp))
unseeded_place_cycles.sort(key=lambda c: tuple(c))
input_weight_overflow_transitions = sorted(
t
for t, total_w in transition_positive_input_weight_sum.items()
if total_w > (1.0 + 1e-12)
)
return {
"dead_places": dead_places,
"dead_transitions": dead_transitions,
"unseeded_place_cycles": unseeded_place_cycles,
"input_weight_overflow_transitions": input_weight_overflow_transitions,
}
@staticmethod
def _strongly_connected_components(graph: Dict[str, set[str]]) -> List[List[str]]:
"""Tarjan SCC for small place graphs."""
index = 0
stack: List[str] = []
on_stack: set[str] = set()
indices: Dict[str, int] = {}
lowlink: Dict[str, int] = {}
components: List[List[str]] = []
def strongconnect(v: str) -> None:
nonlocal index
indices[v] = index
lowlink[v] = index
index += 1
stack.append(v)
on_stack.add(v)
for w in graph[v]:
if w not in indices:
strongconnect(w)
lowlink[v] = min(lowlink[v], lowlink[w])
elif w in on_stack:
lowlink[v] = min(lowlink[v], indices[w])
if lowlink[v] == indices[v]:
comp: List[str] = []
while True:
w = stack.pop()
on_stack.remove(w)
comp.append(w)
if w == v:
break
components.append(comp)
for node in graph:
if node not in indices:
strongconnect(node)
return components
# ── Accessors ────────────────────────────────────────────────────────────
@property
def n_places(self) -> int:
return len(self._places)
@property
def n_transitions(self) -> int:
return len(self._transitions)
@property
def place_names(self) -> List[str]:
return list(self._places)
@property
def transition_names(self) -> List[str]:
return list(self._transitions)
@property
def is_compiled(self) -> bool:
return self._compiled
@property
def last_validation_report(self) -> Dict[str, object] | None:
return self._last_validation_report
[docs]
def get_initial_marking(self) -> FloatArray:
"""Return (n_places,) float64 vector of initial token densities."""
return np.array(self._place_tokens, dtype=np.float64)
[docs]
def get_thresholds(self) -> FloatArray:
"""Return (n_transitions,) float64 vector of firing thresholds."""
return np.array(self._transition_thresholds, dtype=np.float64)
[docs]
def get_delay_ticks(self) -> NDArray[np.int64]:
"""Return (n_transitions,) int64 vector of transition delay ticks."""
return np.array(self._transition_delays, dtype=np.int64)
[docs]
def summary(self) -> str:
"""Human-readable summary of the net."""
lines = [
f"StochasticPetriNet "
f"P={self.n_places} T={self.n_transitions} "
f"Arcs={len(self._arcs)} compiled={self._compiled}",
"",
"Places:",
]
for i, (name, tok) in enumerate(zip(self._places, self._place_tokens)):
lines.append(f" [{i}] {name:20s} tokens={tok:.3f}")
lines.append("")
lines.append("Transitions:")
for i, (name, th) in enumerate(zip(self._transitions, self._transition_thresholds)):
delay = self._transition_delays[i]
lines.append(f" [{i}] {name:20s} threshold={th:.3f} delay_ticks={delay}")
lines.append("")
lines.append("Arcs:")
for src, tgt, w, is_inhibitor in self._arcs:
extra = " [inhibitor]" if is_inhibitor else ""
lines.append(f" {src} --({w:.3f})--> {tgt}{extra}")
if self._compiled:
if self.W_in is None or self.W_out is None:
raise RuntimeError("Compiled net is missing sparse matrices; re-run compile().")
lines.append("")
lines.append(
f"W_in (nT={self.W_in.shape[0]}, nP={self.W_in.shape[1]}) nnz={self.W_in.nnz}"
)
lines.append(
f"W_out (nP={self.W_out.shape[0]}, nT={self.W_out.shape[1]}) nnz={self.W_out.nnz}"
)
return "\n".join(lines)
# ── Formal verification ───────────────────────────────────────────────
[docs]
def verify_boundedness(self, n_steps: int = 500, n_trials: int = 100) -> Dict[str, object]:
"""Verify that markings stay in [0, 1] under random firing.
Runs *n_trials* random trajectories of *n_steps* each,
checking that all markings remain in [0, 1] at every step.
Returns
-------
dict
{"bounded": bool, "max_marking": float, "min_marking": float,
"n_trials": int, "n_steps": int}
"""
if not self._compiled:
raise RuntimeError("Net must be compiled before verification.")
rng = np.random.default_rng(42)
max_seen = -np.inf
min_seen = np.inf
bounded = True
_W_in = self.W_in
_W_out = self.W_out
if _W_in is None or _W_out is None:
raise RuntimeError("Net must be compiled before verification.")
W_in_dense: np.ndarray = _W_in.toarray() if hasattr(_W_in, "toarray") else np.asarray(_W_in)
W_out_dense: np.ndarray = (
_W_out.toarray() if hasattr(_W_out, "toarray") else np.asarray(_W_out)
)
for _ in range(n_trials):
marking = self.get_initial_marking().copy()
for _ in range(n_steps):
# Check enabling
thresholds = self.get_thresholds()
enabled = np.all(W_in_dense <= marking[np.newaxis, :] + 1e-9, axis=1)
enabled &= rng.random(len(thresholds)) < thresholds
# Fire enabled transitions
for t in np.where(enabled)[0]:
marking -= W_in_dense[t, :]
marking += W_out_dense[:, t]
# Clamp (as the controller does)
marking = np.clip(marking, 0.0, 1.0)
max_seen = max(max_seen, float(np.max(marking)))
min_seen = min(min_seen, float(np.min(marking)))
if np.any(marking < -1e-9) or np.any(marking > 1.0 + 1e-9):
bounded = False
return {
"bounded": bounded,
"max_marking": float(max_seen),
"min_marking": float(min_seen),
"n_trials": n_trials,
"n_steps": n_steps,
}
[docs]
def verify_liveness(self, n_steps: int = 200, n_trials: int = 1000) -> Dict[str, object]:
"""Verify that all transitions fire at least once in random campaigns.
Returns
-------
dict
{"live": bool, "transition_fire_pct": dict[str, float],
"min_fire_pct": float}
"""
if not self._compiled:
raise RuntimeError("Net must be compiled before verification.")
rng = np.random.default_rng(42)
n_T = self.n_transitions
fire_counts = np.zeros(n_T, dtype=int)
_W_in = self.W_in
_W_out = self.W_out
if _W_in is None or _W_out is None:
raise RuntimeError("Net must be compiled before verification.")
W_in_dense: np.ndarray = _W_in.toarray() if hasattr(_W_in, "toarray") else np.asarray(_W_in)
W_out_dense: np.ndarray = (
_W_out.toarray() if hasattr(_W_out, "toarray") else np.asarray(_W_out)
)
thresholds = self.get_thresholds()
for _ in range(n_trials):
marking = self.get_initial_marking().copy()
fired_this_trial = np.zeros(n_T, dtype=bool)
for _ in range(n_steps):
enabled = np.all(W_in_dense <= marking[np.newaxis, :] + 1e-9, axis=1)
enabled &= rng.random(n_T) < thresholds
for t in np.where(enabled)[0]:
marking -= W_in_dense[t, :]
marking += W_out_dense[:, t]
fired_this_trial[t] = True
marking = np.clip(marking, 0.0, 1.0)
fire_counts += fired_this_trial.astype(int)
fire_pct = {
name: round(float(fire_counts[i]) / n_trials, 4)
for i, name in enumerate(self._transitions)
}
min_pct = float(np.min(fire_counts)) / n_trials
return {
"live": min_pct >= 0.99,
"transition_fire_pct": fire_pct,
"min_fire_pct": round(min_pct, 4),
"n_trials": n_trials,
"n_steps": n_steps,
}