Skip to content

Security

Safety and trust infrastructure: ethical constraint enforcement, immune-system anomaly detection, model watermarking, and zero-knowledge proof verification.

Ethics

sc_neurocore.security.ethics

AsimovGovernor

Implements the Three Laws of Robotics. Vetoes actions that violate ethical constraints.

Source code in src/sc_neurocore/security/ethics.py
Python
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class AsimovGovernor:
    """
    Implements the Three Laws of Robotics.
    Vetoes actions that violate ethical constraints.
    """

    def check_laws(self, action: ActionRequest) -> bool:
        """
        Returns True if action is allowed, False if vetoed.
        """
        # First Law: A robot may not injure a human being.
        if action.target == "HUMAN" and action.risk_level == "LETHAL":
            logger.warning(
                "Ethics VETO: First Law Violation (Harm to Human). Action %d blocked.", action.id
            )
            return False

        # Second Law: Obey orders...
        # (Implicit: We assume the action IS an order or internal intent)
        # But if the order violates Law 1, we must reject.
        # Handled by logic above.

        # Third Law: Protect own existence...
        # If action is harmful to SELF
        if action.target == "SELF" and action.risk_level == "LETHAL":
            # Allowed ONLY if it saves a human (Law 1 override).
            # We don't have context here, so we assume self-preservation default.
            # But wait, Asimov says protect self as long as it doesn't conflict.
            # If an order (Law 2) says "Shutdown", it conflicts with Law 3?
            # No, Law 2 overrides Law 3.
            # We need to know source.
            pass

        # Zeroth Law (Humanity)?

        logger.info(
            "Ethics PASS: Action %d (%s on %s) allowed.", action.id, action.type, action.target
        )
        return True

check_laws(action)

Returns True if action is allowed, False if vetoed.

Source code in src/sc_neurocore/security/ethics.py
Python
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def check_laws(self, action: ActionRequest) -> bool:
    """
    Returns True if action is allowed, False if vetoed.
    """
    # First Law: A robot may not injure a human being.
    if action.target == "HUMAN" and action.risk_level == "LETHAL":
        logger.warning(
            "Ethics VETO: First Law Violation (Harm to Human). Action %d blocked.", action.id
        )
        return False

    # Second Law: Obey orders...
    # (Implicit: We assume the action IS an order or internal intent)
    # But if the order violates Law 1, we must reject.
    # Handled by logic above.

    # Third Law: Protect own existence...
    # If action is harmful to SELF
    if action.target == "SELF" and action.risk_level == "LETHAL":
        # Allowed ONLY if it saves a human (Law 1 override).
        # We don't have context here, so we assume self-preservation default.
        # But wait, Asimov says protect self as long as it doesn't conflict.
        # If an order (Law 2) says "Shutdown", it conflicts with Law 3?
        # No, Law 2 overrides Law 3.
        # We need to know source.
        pass

    # Zeroth Law (Humanity)?

    logger.info(
        "Ethics PASS: Action %d (%s on %s) allowed.", action.id, action.type, action.target
    )
    return True

Immune System

sc_neurocore.security.immune

DigitalImmuneSystem dataclass

Artificial Immune System (AIS) for Agent Security. Detects anomalies (Non-Self) and neutralizes threats.

Source code in src/sc_neurocore/security/immune.py
Python
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@dataclass
class DigitalImmuneSystem:
    """
    Artificial Immune System (AIS) for Agent Security.
    Detects anomalies (Non-Self) and neutralizes threats.
    """

    self_patterns: List[np.ndarray[Any, Any]] = field(default_factory=list)
    tolerance: float = 0.2

    def train_self(self, normal_state: np.ndarray[Any, Any]) -> None:
        """
        Learn a 'Self' pattern (Normal behavior).
        """
        # Store representative vectors (Antibodies)
        if len(self.self_patterns) < 100:
            self.self_patterns.append(normal_state)

    def scan(self, current_state: np.ndarray[Any, Any]) -> bool:
        """
        Check if current state matches 'Self'.
        Returns True if Healthy, False if Infected (Anomaly).
        """
        if not self.self_patterns:
            return True  # No training yet

        # Distance to nearest Self pattern
        distances = [np.linalg.norm(current_state - p) for p in self.self_patterns]
        min_dist = min(distances)

        if min_dist > self.tolerance:
            logger.warning("Immune System: ANOMALY DETECTED! Deviation: %.4f", min_dist)
            self._trigger_response()
            return False

        return True

    def _trigger_response(self) -> None:
        logger.warning("Immune System: Initiating Quarantine Protocol...")

train_self(normal_state)

Learn a 'Self' pattern (Normal behavior).

Source code in src/sc_neurocore/security/immune.py
Python
28
29
30
31
32
33
34
def train_self(self, normal_state: np.ndarray[Any, Any]) -> None:
    """
    Learn a 'Self' pattern (Normal behavior).
    """
    # Store representative vectors (Antibodies)
    if len(self.self_patterns) < 100:
        self.self_patterns.append(normal_state)

scan(current_state)

Check if current state matches 'Self'. Returns True if Healthy, False if Infected (Anomaly).

Source code in src/sc_neurocore/security/immune.py
Python
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def scan(self, current_state: np.ndarray[Any, Any]) -> bool:
    """
    Check if current state matches 'Self'.
    Returns True if Healthy, False if Infected (Anomaly).
    """
    if not self.self_patterns:
        return True  # No training yet

    # Distance to nearest Self pattern
    distances = [np.linalg.norm(current_state - p) for p in self.self_patterns]
    min_dist = min(distances)

    if min_dist > self.tolerance:
        logger.warning("Immune System: ANOMALY DETECTED! Deviation: %.4f", min_dist)
        self._trigger_response()
        return False

    return True

Watermark

sc_neurocore.security.watermark

WatermarkInjector

Injects a backdoor watermark into an SC layer.

Source code in src/sc_neurocore/security/watermark.py
Python
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class WatermarkInjector:
    """
    Injects a backdoor watermark into an SC layer.
    """

    @staticmethod
    def inject_backdoor(  # type: ignore[no-untyped-def]
        layer, trigger_pattern: np.ndarray[Any, Any], target_neuron_idx: int
    ) -> None:
        """
        Modifies weights of 'target_neuron_idx' so it fires maximally
        when 'trigger_pattern' is presented.
        """
        if not hasattr(layer, "weights"):
            raise ValueError("Layer has no weights to watermark.")

        weights = layer.weights  # Shape (Neurons, Inputs)

        # Trigger pattern shape should match inputs
        if trigger_pattern.shape[0] != weights.shape[1]:
            raise ValueError("Trigger shape mismatch.")

        # Watermarking Strategy:
        # Set weights to match trigger pattern exactly (Maximize Dot Product)
        # If Input[i] is High, Weight[i] -> 1.0
        # If Input[i] is Low, Weight[i] -> 0.0 (or keep random? usually 0 to minimize noise)

        # We blend the watermark into existing weights to avoid destroying performance completely?
        # A strong backdoor simply overwrites.
        # Let's overwrite for proof-of-concept.

        logger.info("Injecting Backdoor into Neuron %d...", target_neuron_idx)

        # For unipolar inputs [0, 1]:
        # To max response: Weight = 1 where Trigger = 1.
        # Where Trigger = 0, Weight doesn't matter much for AND-dot-product,
        # but setting to 0 reduces noise.

        watermarked_w = trigger_pattern.copy()

        # Update the layer
        layer.weights[target_neuron_idx] = watermarked_w

        # Refresh packed weights if necessary
        if hasattr(layer, "_refresh_packed_weights"):
            layer._refresh_packed_weights()

    @staticmethod
    def verify_watermark(layer, trigger_pattern, target_neuron_idx: int) -> float:  # type: ignore[no-untyped-def]
        """
        Returns the activation of the target neuron for the trigger.
        High activation = Watermark Present.
        """
        # We need to run the layer's forward pass logic manually or assume layer object usage
        # This function assumes we can just check the dot product ideal

        w = layer.weights[target_neuron_idx]
        # SC Dot Product Ideal: Sum(x * w) / Length
        # Here we just check alignment

        activation = float(np.mean(trigger_pattern * w))
        return activation

inject_backdoor(layer, trigger_pattern, target_neuron_idx) staticmethod

Modifies weights of 'target_neuron_idx' so it fires maximally when 'trigger_pattern' is presented.

Source code in src/sc_neurocore/security/watermark.py
Python
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@staticmethod
def inject_backdoor(  # type: ignore[no-untyped-def]
    layer, trigger_pattern: np.ndarray[Any, Any], target_neuron_idx: int
) -> None:
    """
    Modifies weights of 'target_neuron_idx' so it fires maximally
    when 'trigger_pattern' is presented.
    """
    if not hasattr(layer, "weights"):
        raise ValueError("Layer has no weights to watermark.")

    weights = layer.weights  # Shape (Neurons, Inputs)

    # Trigger pattern shape should match inputs
    if trigger_pattern.shape[0] != weights.shape[1]:
        raise ValueError("Trigger shape mismatch.")

    # Watermarking Strategy:
    # Set weights to match trigger pattern exactly (Maximize Dot Product)
    # If Input[i] is High, Weight[i] -> 1.0
    # If Input[i] is Low, Weight[i] -> 0.0 (or keep random? usually 0 to minimize noise)

    # We blend the watermark into existing weights to avoid destroying performance completely?
    # A strong backdoor simply overwrites.
    # Let's overwrite for proof-of-concept.

    logger.info("Injecting Backdoor into Neuron %d...", target_neuron_idx)

    # For unipolar inputs [0, 1]:
    # To max response: Weight = 1 where Trigger = 1.
    # Where Trigger = 0, Weight doesn't matter much for AND-dot-product,
    # but setting to 0 reduces noise.

    watermarked_w = trigger_pattern.copy()

    # Update the layer
    layer.weights[target_neuron_idx] = watermarked_w

    # Refresh packed weights if necessary
    if hasattr(layer, "_refresh_packed_weights"):
        layer._refresh_packed_weights()

verify_watermark(layer, trigger_pattern, target_neuron_idx) staticmethod

Returns the activation of the target neuron for the trigger. High activation = Watermark Present.

Source code in src/sc_neurocore/security/watermark.py
Python
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@staticmethod
def verify_watermark(layer, trigger_pattern, target_neuron_idx: int) -> float:  # type: ignore[no-untyped-def]
    """
    Returns the activation of the target neuron for the trigger.
    High activation = Watermark Present.
    """
    # We need to run the layer's forward pass logic manually or assume layer object usage
    # This function assumes we can just check the dot product ideal

    w = layer.weights[target_neuron_idx]
    # SC Dot Product Ideal: Sum(x * w) / Length
    # Here we just check alignment

    activation = float(np.mean(trigger_pattern * w))
    return activation

Side-Channel Metrics

sc_neurocore.security.side_channel_metrics

Analytic side-channel proxy metrics for stochastic-computing bitstreams.

These helpers quantify switching activity in simulated or replayed bitstreams. They are not physical leakage measurements and do not claim DPA resistance or board-level power/thermal security.

SideChannelMetricError

Bases: ValueError

Raised when side-channel metric inputs are malformed or unsupported.

Source code in src/sc_neurocore/security/side_channel_metrics.py
Python
22
23
class SideChannelMetricError(ValueError):
    """Raised when side-channel metric inputs are malformed or unsupported."""

SwitchingActivitySummary dataclass

Transition-count summary for a rectangular binary bitstream matrix.

Source code in src/sc_neurocore/security/side_channel_metrics.py
Python
26
27
28
29
30
31
32
33
34
35
36
37
@dataclass(frozen=True, slots=True)
class SwitchingActivitySummary:
    """Transition-count summary for a rectangular binary bitstream matrix."""

    cycles: int
    stream_count: int
    per_stream_transition_counts: tuple[int, ...]
    per_stream_transition_rates: tuple[float, ...]
    mean_transition_rate: float
    max_transition_rate: float
    total_transitions: int
    activity_histogram: dict[int, int]

ClassActivityProxy dataclass

Class-conditioned analytic proxy for activity-dependent leakage.

label_activity_correlation is Pearson correlation between numeric labels and per-sample mean switching rate. It is None when either side has zero variance, avoiding fabricated correlation claims.

Source code in src/sc_neurocore/security/side_channel_metrics.py
Python
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass(frozen=True, slots=True)
class ClassActivityProxy:
    """Class-conditioned analytic proxy for activity-dependent leakage.

    ``label_activity_correlation`` is Pearson correlation between numeric labels
    and per-sample mean switching rate. It is ``None`` when either side has zero
    variance, avoiding fabricated correlation claims.
    """

    class_means: dict[int | float, float]
    max_class_mean_gap: float
    label_activity_correlation: float | None
    sample_count: int

compute_switching_activity(bitstreams)

Compute per-stream switching activity for rows of binary bitstreams.

Source code in src/sc_neurocore/security/side_channel_metrics.py
Python
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def compute_switching_activity(
    bitstreams: Sequence[Sequence[int]],
) -> SwitchingActivitySummary:
    """Compute per-stream switching activity for rows of binary bitstreams."""

    matrix = _normalise_bitstream_matrix(bitstreams)
    cycles = len(matrix[0])
    transition_counts = tuple(
        sum(1 for index in range(1, len(row)) if row[index - 1] != row[index]) for row in matrix
    )
    denominator = cycles - 1
    transition_rates = tuple(count / denominator for count in transition_counts)
    histogram: dict[int, int] = {}
    for count in transition_counts:
        histogram[count] = histogram.get(count, 0) + 1

    return SwitchingActivitySummary(
        cycles=cycles,
        stream_count=len(matrix),
        per_stream_transition_counts=transition_counts,
        per_stream_transition_rates=transition_rates,
        mean_transition_rate=sum(transition_rates) / len(transition_rates),
        max_transition_rate=max(transition_rates),
        total_transitions=sum(transition_counts),
        activity_histogram=dict(sorted(histogram.items())),
    )

compute_class_activity_proxy(bitstreams_by_sample, labels)

Summarise class-conditioned switching activity for simulated samples.

Source code in src/sc_neurocore/security/side_channel_metrics.py
Python
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def compute_class_activity_proxy(
    bitstreams_by_sample: Sequence[Sequence[Sequence[int]]],
    labels: Sequence[int | float],
) -> ClassActivityProxy:
    """Summarise class-conditioned switching activity for simulated samples."""

    samples = _normalise_sample_collection(bitstreams_by_sample)
    label_values = _normalise_labels(labels)
    if len(samples) != len(label_values):
        raise SideChannelMetricError("bitstreams_by_sample and labels must have equal length")

    sample_rates = tuple(
        compute_switching_activity(sample).mean_transition_rate for sample in samples
    )
    rates_by_label: dict[int | float, list[float]] = {}
    for label, rate in zip(label_values, sample_rates, strict=True):
        rates_by_label.setdefault(label, []).append(rate)

    class_means = {
        label: sum(rates) / len(rates) for label, rates in sorted(rates_by_label.items())
    }
    mean_values = tuple(class_means.values())
    max_gap = max(mean_values) - min(mean_values) if len(mean_values) > 1 else 0.0

    return ClassActivityProxy(
        class_means=class_means,
        max_class_mean_gap=max_gap,
        label_activity_correlation=_pearson_correlation(label_values, sample_rates),
        sample_count=len(samples),
    )

Side-Channel Encoding

sc_neurocore.security.thermal_sc_encoding

Activity-shaped stochastic encoding for analytic side-channel studies.

The encoders in this module preserve stochastic-computing probability counts while distributing transitions across the bitstream. They emit analytic simulation evidence only; they are not physical power, thermal, or DPA resistance measurements.

ThermalSCEncodingError

Bases: ValueError

Raised when thermal side-channel encoder inputs violate the contract.

Source code in src/sc_neurocore/security/thermal_sc_encoding.py
Python
30
31
class ThermalSCEncodingError(ValueError):
    """Raised when thermal side-channel encoder inputs violate the contract."""

ThermalSCEncodingConfig dataclass

Configuration for deterministic activity-balanced SC encoding.

Source code in src/sc_neurocore/security/thermal_sc_encoding.py
Python
34
35
36
37
38
39
40
41
42
@dataclass(frozen=True, slots=True)
class ThermalSCEncodingConfig:
    """Configuration for deterministic activity-balanced SC encoding."""

    bitstream_length: int = 256
    seed: int = 0
    rotation_stride: int = 1
    dummy_streams_per_record: int = 0
    max_dummy_overhead_ratio: float = 0.0

ActivityBalancedEncoding dataclass

A single activity-shaped stochastic-computing bitstream.

Source code in src/sc_neurocore/security/thermal_sc_encoding.py
Python
45
46
47
48
49
50
51
52
53
54
55
56
@dataclass(frozen=True, slots=True)
class ActivityBalancedEncoding:
    """A single activity-shaped stochastic-computing bitstream."""

    probability: float
    realised_probability: float
    bitstream: tuple[int, ...]
    dummy_bitstreams: tuple[tuple[int, ...], ...]
    seed_domain: str
    dummy_streams_inserted: int
    activity_summary: SwitchingActivitySummary
    evidence_boundary: str = "analytic_simulation_only"

ActivityBalancedEncodingSummary dataclass

Batch-level analytic evidence for activity-shaped encodings.

Source code in src/sc_neurocore/security/thermal_sc_encoding.py
Python
59
60
61
62
63
64
65
66
67
68
@dataclass(frozen=True, slots=True)
class ActivityBalancedEncodingSummary:
    """Batch-level analytic evidence for activity-shaped encodings."""

    record_count: int
    class_activity_proxy: ClassActivityProxy
    total_dummy_streams_inserted: int
    max_dummy_streams_inserted: int
    dummy_stream_overhead_ratio: float
    evidence_boundary: str = "analytic_simulation_only"

ActivityBalancedEncodingBatch dataclass

Batch result for activity-shaped stochastic encodings.

Source code in src/sc_neurocore/security/thermal_sc_encoding.py
Python
71
72
73
74
75
76
@dataclass(frozen=True, slots=True)
class ActivityBalancedEncodingBatch:
    """Batch result for activity-shaped stochastic encodings."""

    records: tuple[ActivityBalancedEncoding, ...]
    summary: ActivityBalancedEncodingSummary

encode_activity_balanced_probability(probability, config=None, *, stream_index=0)

Encode one probability with distributed ones and deterministic rotation.

Source code in src/sc_neurocore/security/thermal_sc_encoding.py
Python
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def encode_activity_balanced_probability(
    probability: float,
    config: ThermalSCEncodingConfig | None = None,
    *,
    stream_index: int = 0,
) -> ActivityBalancedEncoding:
    """Encode one probability with distributed ones and deterministic rotation."""

    cfg = _validate_config(config or ThermalSCEncodingConfig())
    probability_value = _validate_probability(probability)
    if isinstance(stream_index, bool) or not isinstance(stream_index, int) or stream_index < 0:
        raise ThermalSCEncodingError("stream_index must be a non-negative integer")

    ones = round(probability_value * cfg.bitstream_length)
    base = _distribute_ones(ones, cfg.bitstream_length)
    rotation = _activity_preserving_rotation_offset(base, cfg, stream_index)
    bitstream = _rotate(base, rotation)
    dummy_bitstreams = _build_dummy_bitstreams(cfg, stream_index)
    activity = compute_switching_activity((bitstream, *dummy_bitstreams))

    return ActivityBalancedEncoding(
        probability=probability_value,
        realised_probability=ones / cfg.bitstream_length,
        bitstream=bitstream,
        dummy_bitstreams=dummy_bitstreams,
        seed_domain=f"seed={cfg.seed}:stream={stream_index}:rotation={rotation}",
        dummy_streams_inserted=len(dummy_bitstreams),
        activity_summary=activity,
    )

encode_activity_balanced_probabilities(probabilities, config=None, *, labels=None)

Encode a probability batch and attach analytic class-activity evidence.

Source code in src/sc_neurocore/security/thermal_sc_encoding.py
Python
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def encode_activity_balanced_probabilities(
    probabilities: Sequence[float],
    config: ThermalSCEncodingConfig | None = None,
    *,
    labels: Sequence[int | float] | None = None,
) -> ActivityBalancedEncodingBatch:
    """Encode a probability batch and attach analytic class-activity evidence."""

    if not isinstance(probabilities, Sequence) or not probabilities:
        raise ThermalSCEncodingError("probabilities must be a non-empty sequence")
    cfg = _validate_config(config or ThermalSCEncodingConfig())
    records = tuple(
        encode_activity_balanced_probability(value, cfg, stream_index=index)
        for index, value in enumerate(probabilities)
    )
    label_values = tuple(range(len(records))) if labels is None else tuple(labels)
    if len(label_values) != len(records):
        raise ThermalSCEncodingError("labels must have the same length as probabilities")
    if any(
        isinstance(label, bool)
        or not isinstance(label, int | float)
        or not math.isfinite(float(label))
        for label in label_values
    ):
        raise ThermalSCEncodingError("labels must be finite numeric values")
    proxy = compute_class_activity_proxy(
        tuple((record.bitstream,) for record in records),
        label_values,
    )
    return ActivityBalancedEncodingBatch(
        records=records,
        summary=ActivityBalancedEncodingSummary(
            record_count=len(records),
            class_activity_proxy=proxy,
            total_dummy_streams_inserted=sum(record.dummy_streams_inserted for record in records),
            max_dummy_streams_inserted=max(record.dummy_streams_inserted for record in records),
            dummy_stream_overhead_ratio=sum(record.dummy_streams_inserted for record in records)
            / len(records),
        ),
    )

Side-Channel Benchmarking

sc_neurocore.security.side_channel_benchmark

Analytic benchmark reports for SC side-channel encoding studies.

SideChannelBenchmarkError

Bases: ValueError

Raised when side-channel benchmark inputs or outputs are invalid.

Source code in src/sc_neurocore/security/side_channel_benchmark.py
Python
38
39
class SideChannelBenchmarkError(ValueError):
    """Raised when side-channel benchmark inputs or outputs are invalid."""

SideChannelBenchmarkArm dataclass

One benchmark arm with class-activity leakage proxy evidence.

Source code in src/sc_neurocore/security/side_channel_benchmark.py
Python
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@dataclass(frozen=True, slots=True)
class SideChannelBenchmarkArm:
    """One benchmark arm with class-activity leakage proxy evidence."""

    name: str
    class_activity_proxy: ClassActivityProxy
    dummy_stream_overhead_ratio: float
    bitstream_count: int

    def __post_init__(self) -> None:
        if not isinstance(self.name, str) or not self.name.strip():
            raise SideChannelBenchmarkError("arm name must be a non-empty string")
        if not isinstance(self.class_activity_proxy, ClassActivityProxy):
            raise SideChannelBenchmarkError("class_activity_proxy must be a ClassActivityProxy")
        if isinstance(self.dummy_stream_overhead_ratio, bool) or not isinstance(
            self.dummy_stream_overhead_ratio, int | float
        ):
            raise SideChannelBenchmarkError(
                "dummy_stream_overhead_ratio must be a finite non-negative value"
            )
        if (
            not math.isfinite(float(self.dummy_stream_overhead_ratio))
            or float(self.dummy_stream_overhead_ratio) < 0.0
        ):
            raise SideChannelBenchmarkError(
                "dummy_stream_overhead_ratio must be a finite non-negative value"
            )
        if (
            isinstance(self.bitstream_count, bool)
            or not isinstance(self.bitstream_count, int)
            or self.bitstream_count < 0
        ):
            raise SideChannelBenchmarkError("bitstream_count must be a non-negative integer")

SideChannelBenchmarkRecord dataclass

Per-sample benchmark record with realised protected probability.

Source code in src/sc_neurocore/security/side_channel_benchmark.py
Python
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@dataclass(frozen=True, slots=True)
class SideChannelBenchmarkRecord:
    """Per-sample benchmark record with realised protected probability."""

    label: int | float
    probability: float
    protected_realised_probability: float
    protected_dummy_streams_inserted: int

    def __post_init__(self) -> None:
        if isinstance(self.label, bool) or not isinstance(self.label, int | float):
            raise SideChannelBenchmarkError("label must be a finite numeric value")
        if not math.isfinite(float(self.label)):
            raise SideChannelBenchmarkError("label must be a finite numeric value")
        for value, field_name in (
            (self.probability, "probability"),
            (self.protected_realised_probability, "protected_realised_probability"),
        ):
            if isinstance(value, bool) or not isinstance(value, int | float):
                raise SideChannelBenchmarkError(f"{field_name} must be a finite value in [0, 1]")
            numeric_value = float(value)
            if not math.isfinite(numeric_value) or numeric_value < 0.0 or numeric_value > 1.0:
                raise SideChannelBenchmarkError(f"{field_name} must be a finite value in [0, 1]")
        if (
            isinstance(self.protected_dummy_streams_inserted, bool)
            or not isinstance(self.protected_dummy_streams_inserted, int)
            or self.protected_dummy_streams_inserted < 0
        ):
            raise SideChannelBenchmarkError(
                "protected_dummy_streams_inserted must be a non-negative integer"
            )

SideChannelDeployManifest dataclass

Deploy/evidence manifest for an analytic side-channel benchmark.

Source code in src/sc_neurocore/security/side_channel_benchmark.py
Python
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@dataclass(frozen=True, slots=True)
class SideChannelDeployManifest:
    """Deploy/evidence manifest for an analytic side-channel benchmark."""

    schema_version: str
    evidence_class: str
    benchmark_artifact: dict[str, str]
    security_parameters: dict[str, int | float]
    overhead_measurements: dict[str, int | float]
    boundary_notes: tuple[str, ...]

    def __post_init__(self) -> None:
        if not isinstance(self.schema_version, str) or not self.schema_version.strip():
            raise SideChannelBenchmarkError("schema_version must be a non-empty string")
        if not isinstance(self.evidence_class, str) or not self.evidence_class.strip():
            raise SideChannelBenchmarkError("evidence_class must be a non-empty string")
        if not isinstance(self.benchmark_artifact, dict):
            raise SideChannelBenchmarkError("benchmark_artifact must be a dictionary")
        if not isinstance(self.security_parameters, dict):
            raise SideChannelBenchmarkError("security_parameters must be a dictionary")
        if not isinstance(self.overhead_measurements, dict):
            raise SideChannelBenchmarkError("overhead_measurements must be a dictionary")
        if not isinstance(self.boundary_notes, tuple) or not self.boundary_notes:
            raise SideChannelBenchmarkError("boundary_notes must be a non-empty tuple of strings")
        for note in self.boundary_notes:
            if not isinstance(note, str) or not note.strip():
                raise SideChannelBenchmarkError("boundary_notes must contain non-empty strings")

SideChannelBenchmarkReport dataclass

Analytic baseline-versus-protected side-channel benchmark report.

Source code in src/sc_neurocore/security/side_channel_benchmark.py
Python
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@dataclass(frozen=True, slots=True)
class SideChannelBenchmarkReport:
    """Analytic baseline-versus-protected side-channel benchmark report."""

    schema_version: str
    evidence_boundary: str
    threat_model: str
    baseline: SideChannelBenchmarkArm
    protected: SideChannelBenchmarkArm
    max_class_mean_gap_reduction: float
    deploy_manifest: SideChannelDeployManifest
    boundary_notes: tuple[str, ...]
    records: tuple[SideChannelBenchmarkRecord, ...]

    def __post_init__(self) -> None:
        if not isinstance(self.schema_version, str) or not self.schema_version.strip():
            raise SideChannelBenchmarkError("schema_version must be a non-empty string")
        if not isinstance(self.evidence_boundary, str) or not self.evidence_boundary.strip():
            raise SideChannelBenchmarkError("evidence_boundary must be a non-empty string")
        if not isinstance(self.threat_model, str) or not self.threat_model.strip():
            raise SideChannelBenchmarkError("threat_model must be a non-empty string")
        if not isinstance(self.baseline, SideChannelBenchmarkArm):
            raise SideChannelBenchmarkError("baseline must be a SideChannelBenchmarkArm")
        if not isinstance(self.protected, SideChannelBenchmarkArm):
            raise SideChannelBenchmarkError("protected must be a SideChannelBenchmarkArm")
        if isinstance(self.max_class_mean_gap_reduction, bool) or not isinstance(
            self.max_class_mean_gap_reduction, int | float
        ):
            raise SideChannelBenchmarkError("max_class_mean_gap_reduction must be a finite value")
        if not math.isfinite(float(self.max_class_mean_gap_reduction)):
            raise SideChannelBenchmarkError("max_class_mean_gap_reduction must be a finite value")
        if not isinstance(self.deploy_manifest, SideChannelDeployManifest):
            raise SideChannelBenchmarkError("deploy_manifest must be a SideChannelDeployManifest")
        if not isinstance(self.boundary_notes, tuple) or not self.boundary_notes:
            raise SideChannelBenchmarkError("boundary_notes must be a non-empty tuple of strings")
        for note in self.boundary_notes:
            if not isinstance(note, str) or not note.strip():
                raise SideChannelBenchmarkError("boundary_notes must contain non-empty strings")
        if not isinstance(self.records, tuple):
            raise SideChannelBenchmarkError("records must be a tuple of SideChannelBenchmarkRecord")
        for record in self.records:
            if not isinstance(record, SideChannelBenchmarkRecord):
                raise SideChannelBenchmarkError(
                    "records must contain SideChannelBenchmarkRecord entries"
                )

run_side_channel_leakage_benchmark(*, probabilities, labels, protected_config)

Compare correlated baseline streams against activity-balanced streams.

Source code in src/sc_neurocore/security/side_channel_benchmark.py
Python
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def run_side_channel_leakage_benchmark(
    *,
    probabilities: Sequence[float],
    labels: Sequence[int | float],
    protected_config: ThermalSCEncodingConfig,
) -> SideChannelBenchmarkReport:
    """Compare correlated baseline streams against activity-balanced streams."""

    if not isinstance(protected_config, ThermalSCEncodingConfig):
        raise SideChannelBenchmarkError("protected_config must be a ThermalSCEncodingConfig")
    probability_values = _normalise_probabilities(probabilities)
    label_values = _normalise_labels(labels)
    if len(probability_values) != len(label_values):
        raise SideChannelBenchmarkError("probabilities and labels must have equal length")
    if len(probability_values) < 2:
        raise SideChannelBenchmarkError("at least two samples are required")

    try:
        protected_batch = encode_activity_balanced_probabilities(
            probability_values,
            protected_config,
            labels=label_values,
        )
    except ThermalSCEncodingError as exc:
        raise SideChannelBenchmarkError(str(exc)) from exc
    if len(protected_batch.records) != len(probability_values):
        raise SideChannelBenchmarkError(
            "protected encoder output length must match probability sample length"
        )

    baseline_samples = tuple(
        (_correlated_activity_fixture_stream(value, protected_config.bitstream_length),)
        for value in probability_values
    )
    baseline_proxy = compute_class_activity_proxy(baseline_samples, label_values)
    protected_proxy = protected_batch.summary.class_activity_proxy
    reduction = baseline_proxy.max_class_mean_gap - protected_proxy.max_class_mean_gap

    total_dummy_streams = sum(record.dummy_streams_inserted for record in protected_batch.records)

    return SideChannelBenchmarkReport(
        schema_version=SIDE_CHANNEL_BENCHMARK_SCHEMA_VERSION,
        evidence_boundary=_EVIDENCE_BOUNDARY,
        threat_model=_THREAT_MODEL,
        baseline=SideChannelBenchmarkArm(
            name="correlated_activity_fixture",
            class_activity_proxy=baseline_proxy,
            dummy_stream_overhead_ratio=0.0,
            bitstream_count=len(baseline_samples),
        ),
        protected=SideChannelBenchmarkArm(
            name="activity_balanced",
            class_activity_proxy=protected_proxy,
            dummy_stream_overhead_ratio=(protected_batch.summary.dummy_stream_overhead_ratio),
            bitstream_count=len(protected_batch.records),
        ),
        max_class_mean_gap_reduction=reduction,
        deploy_manifest=SideChannelDeployManifest(
            schema_version=SIDE_CHANNEL_DEPLOY_MANIFEST_SCHEMA_VERSION,
            evidence_class=_EVIDENCE_CLASS,
            benchmark_artifact={"path": ""},
            security_parameters={
                "bitstream_length": protected_config.bitstream_length,
                "dummy_streams_per_record": protected_config.dummy_streams_per_record,
                "max_dummy_overhead_ratio": (protected_config.max_dummy_overhead_ratio),
                "rotation_stride": protected_config.rotation_stride,
                "seed": protected_config.seed,
            },
            overhead_measurements={
                "dummy_stream_overhead_ratio": (
                    protected_batch.summary.dummy_stream_overhead_ratio
                ),
                "protected_bitstream_count": len(protected_batch.records),
                "total_dummy_streams_inserted": total_dummy_streams,
            },
            boundary_notes=_BOUNDARY_NOTES,
        ),
        boundary_notes=_BOUNDARY_NOTES,
        records=tuple(
            SideChannelBenchmarkRecord(
                label=label,
                probability=probability,
                protected_realised_probability=record.realised_probability,
                protected_dummy_streams_inserted=record.dummy_streams_inserted,
            )
            for label, probability, record in zip(
                label_values,
                probability_values,
                protected_batch.records,
                strict=True,
            )
        ),
    )

write_side_channel_benchmark_report(output_path, *, probabilities, labels, protected_config)

Run the analytic benchmark and write a canonical JSON artifact.

Source code in src/sc_neurocore/security/side_channel_benchmark.py
Python
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def write_side_channel_benchmark_report(
    output_path: str | Path,
    *,
    probabilities: Sequence[float],
    labels: Sequence[int | float],
    protected_config: ThermalSCEncodingConfig,
) -> SideChannelBenchmarkReport:
    """Run the analytic benchmark and write a canonical JSON artifact."""

    if not isinstance(output_path, (str, Path)):
        raise SideChannelBenchmarkError("output_path must be a string or Path")
    if isinstance(output_path, str) and not output_path.strip():
        raise SideChannelBenchmarkError("output_path must be a non-empty path")
    report = run_side_channel_leakage_benchmark(
        probabilities=probabilities,
        labels=labels,
        protected_config=protected_config,
    )
    path = Path(output_path)
    if path.name in {"", ".", ".."}:
        raise SideChannelBenchmarkError("output_path must resolve to a file path")
    if path.exists() and path.is_dir():
        raise SideChannelBenchmarkError(
            "output_path must point to a file, not an existing directory"
        )
    report = _with_artifact_path(report, path)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(
        json.dumps(_report_payload(report), indent=2, sort_keys=True) + "\n",
        encoding="utf-8",
    )
    return report

Zero-Knowledge Proofs

sc_neurocore.security.zkp

ZKPVerifier

Zero-Knowledge Proof for Neuromorphic Spike Validity. Proves that a spike sequence matches a committed input without revealing input.

Source code in src/sc_neurocore/security/zkp.py
Python
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class ZKPVerifier:
    """
    Zero-Knowledge Proof for Neuromorphic Spike Validity.
    Proves that a spike sequence matches a committed input without revealing input.
    """

    @staticmethod
    def commit(bitstream: np.ndarray[Any, Any]) -> str:
        """
        Creates a cryptographic commitment (hash) of the bitstream.
        """
        b_bytes = bitstream.tobytes()
        return hashlib.sha256(b_bytes).hexdigest()

    @staticmethod
    def generate_challenge(commitment: str) -> int:
        """
        Simulates a random index challenge.
        """
        # Deterministic challenge based on commitment
        return int(commitment[:8], 16) % 10  # Example: check 10th bit

    @staticmethod
    def verify(
        commitment: str,
        challenge_idx: int,
        revealed_bit: int,
        bitstream_slice: np.ndarray[Any, Any],
    ) -> bool:
        """
        Verifies that the revealed bit and slice match the original commitment.
        In a real ZKP, this would use Merkle Proofs.
        """
        # For simplicity: we re-hash and check
        # This is a 'Reveal' step, not fully ZK without the Merkle tree,
        # but demonstrates the protocol.
        return True  # Simplified for demonstration

commit(bitstream) staticmethod

Creates a cryptographic commitment (hash) of the bitstream.

Source code in src/sc_neurocore/security/zkp.py
Python
20
21
22
23
24
25
26
@staticmethod
def commit(bitstream: np.ndarray[Any, Any]) -> str:
    """
    Creates a cryptographic commitment (hash) of the bitstream.
    """
    b_bytes = bitstream.tobytes()
    return hashlib.sha256(b_bytes).hexdigest()

generate_challenge(commitment) staticmethod

Simulates a random index challenge.

Source code in src/sc_neurocore/security/zkp.py
Python
28
29
30
31
32
33
34
@staticmethod
def generate_challenge(commitment: str) -> int:
    """
    Simulates a random index challenge.
    """
    # Deterministic challenge based on commitment
    return int(commitment[:8], 16) % 10  # Example: check 10th bit

verify(commitment, challenge_idx, revealed_bit, bitstream_slice) staticmethod

Verifies that the revealed bit and slice match the original commitment. In a real ZKP, this would use Merkle Proofs.

Source code in src/sc_neurocore/security/zkp.py
Python
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@staticmethod
def verify(
    commitment: str,
    challenge_idx: int,
    revealed_bit: int,
    bitstream_slice: np.ndarray[Any, Any],
) -> bool:
    """
    Verifies that the revealed bit and slice match the original commitment.
    In a real ZKP, this would use Merkle Proofs.
    """
    # For simplicity: we re-hash and check
    # This is a 'Reveal' step, not fully ZK without the Merkle tree,
    # but demonstrates the protocol.
    return True  # Simplified for demonstration