Skip to content

Pipeline

Data ingestion and training orchestration for SNN workflows.

  • DataIngestor — Multimodal dataset preparation: spike encoding, batching, augmentation
  • SCTrainingLoop — Standard and RL training orchestration with logging, checkpointing, and early stopping
Python
from sc_neurocore.pipeline import DataIngestor, SCTrainingLoop

sc_neurocore.pipeline

sc_neurocore.pipeline -- Tier: research (experimental / research).

DataIngestor

Ingests and normalizes multimodal datasets for SC training.

Source code in src/sc_neurocore/pipeline/ingestion.py
Python
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class DataIngestor:
    """
    Ingests and normalizes multimodal datasets for SC training.
    """

    def prepare_dataset(self, raw_data: Dict[str, Any]) -> MultimodalDataset:
        """
        Normalizes and packages raw multimodal data.
        """
        processed_data = {}
        for k, v in raw_data.items():
            arr = np.array(v)
            # Normalize to [0, 1]
            arr_min = np.min(arr)
            arr_max = np.max(arr)
            if arr_max > arr_min:
                processed_data[k] = (arr - arr_min) / (arr_max - arr_min)
            else:
                processed_data[k] = np.zeros_like(arr)

        return MultimodalDataset(
            data=processed_data, labels=np.zeros(len(list(processed_data.values())[0]))
        )

prepare_dataset(raw_data)

Normalizes and packages raw multimodal data.

Source code in src/sc_neurocore/pipeline/ingestion.py
Python
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def prepare_dataset(self, raw_data: Dict[str, Any]) -> MultimodalDataset:
    """
    Normalizes and packages raw multimodal data.
    """
    processed_data = {}
    for k, v in raw_data.items():
        arr = np.array(v)
        # Normalize to [0, 1]
        arr_min = np.min(arr)
        arr_max = np.max(arr)
        if arr_max > arr_min:
            processed_data[k] = (arr - arr_min) / (arr_max - arr_min)
        else:
            processed_data[k] = np.zeros_like(arr)

    return MultimodalDataset(
        data=processed_data, labels=np.zeros(len(list(processed_data.values())[0]))
    )

MultimodalDataset dataclass

A container for multimodal training data.

Source code in src/sc_neurocore/pipeline/ingestion.py
Python
18
19
20
21
22
23
24
25
26
27
28
@dataclass
class MultimodalDataset:
    """
    A container for multimodal training data.
    """

    data: Dict[str, np.ndarray[Any, Any]]  # {'vision': [...], 'audio': [...]}
    labels: np.ndarray[Any, Any]

    def get_sample(self, idx: int) -> Dict[str, np.ndarray[Any, Any]]:
        return {k: v[idx] for k, v in self.data.items()}

SCTrainingLoop

Standard and Reinforcement Learning loops for SC Networks.

Source code in src/sc_neurocore/pipeline/training.py
Python
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class SCTrainingLoop:
    """
    Standard and Reinforcement Learning loops for SC Networks.
    """

    @staticmethod
    def run_rl_epoch(
        agent: SCLearningLayer,
        env_step_func: Callable[[np.ndarray], float],
        input_data: np.ndarray,
        generations: int = 10,
    ) -> None:
        """
        Runs a reinforcement learning epoch.
        Uses RewardModulatedSTDPSynapse logic.
        """
        for gen in range(generations):
            # 1. Run forward pass
            spikes = agent.run_epoch(input_data)  # type: ignore[arg-type]

            # 2. Get reward from environment
            reward = env_step_func(spikes)

            # 3. Apply reward to all synapses
            for i in range(agent.n_neurons):
                for j in range(agent.n_inputs):
                    syn = agent.synapses[i][j]
                    if isinstance(syn, RewardModulatedSTDPSynapse):
                        syn.apply_reward(reward)

            logger.info("RL Epoch %d: Reward = %.4f", gen, reward)

    @staticmethod
    def train_multimodal_fusion(fusion_layer: Any, dataset: Any, epochs: int = 5) -> None:
        """Train weights in a multimodal fusion layer via per-sample updates.

        Iterates over the dataset for ``epochs`` rounds, calling
        ``fusion_layer.train_step(sample)`` on each sample returned by
        ``dataset.get_sample(i)``.  The fusion layer is responsible for
        its own weight update rule (Hebbian, LMS, etc.).
        """
        n_samples = getattr(dataset, "n_samples", len(getattr(dataset, "labels", [])))
        for epoch in range(epochs):
            total_loss = 0.0
            for i in range(n_samples):
                sample = dataset.get_sample(i)
                output = fusion_layer.train_step(sample)
                if output is not None:
                    total_loss += float(np.sum(np.abs(output)))
            avg_loss = total_loss / max(n_samples, 1)
            logger.info("Fusion Epoch %d/%d: avg_loss=%.4f", epoch + 1, epochs, avg_loss)

run_rl_epoch(agent, env_step_func, input_data, generations=10) staticmethod

Runs a reinforcement learning epoch. Uses RewardModulatedSTDPSynapse logic.

Source code in src/sc_neurocore/pipeline/training.py
Python
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@staticmethod
def run_rl_epoch(
    agent: SCLearningLayer,
    env_step_func: Callable[[np.ndarray], float],
    input_data: np.ndarray,
    generations: int = 10,
) -> None:
    """
    Runs a reinforcement learning epoch.
    Uses RewardModulatedSTDPSynapse logic.
    """
    for gen in range(generations):
        # 1. Run forward pass
        spikes = agent.run_epoch(input_data)  # type: ignore[arg-type]

        # 2. Get reward from environment
        reward = env_step_func(spikes)

        # 3. Apply reward to all synapses
        for i in range(agent.n_neurons):
            for j in range(agent.n_inputs):
                syn = agent.synapses[i][j]
                if isinstance(syn, RewardModulatedSTDPSynapse):
                    syn.apply_reward(reward)

        logger.info("RL Epoch %d: Reward = %.4f", gen, reward)

train_multimodal_fusion(fusion_layer, dataset, epochs=5) staticmethod

Train weights in a multimodal fusion layer via per-sample updates.

Iterates over the dataset for epochs rounds, calling fusion_layer.train_step(sample) on each sample returned by dataset.get_sample(i). The fusion layer is responsible for its own weight update rule (Hebbian, LMS, etc.).

Source code in src/sc_neurocore/pipeline/training.py
Python
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@staticmethod
def train_multimodal_fusion(fusion_layer: Any, dataset: Any, epochs: int = 5) -> None:
    """Train weights in a multimodal fusion layer via per-sample updates.

    Iterates over the dataset for ``epochs`` rounds, calling
    ``fusion_layer.train_step(sample)`` on each sample returned by
    ``dataset.get_sample(i)``.  The fusion layer is responsible for
    its own weight update rule (Hebbian, LMS, etc.).
    """
    n_samples = getattr(dataset, "n_samples", len(getattr(dataset, "labels", [])))
    for epoch in range(epochs):
        total_loss = 0.0
        for i in range(n_samples):
            sample = dataset.get_sample(i)
            output = fusion_layer.train_step(sample)
            if output is not None:
                total_loss += float(np.sum(np.abs(output)))
        avg_loss = total_loss / max(n_samples, 1)
        logger.info("Fusion Epoch %d/%d: avg_loss=%.4f", epoch + 1, epochs, avg_loss)