Skip to content

Streaming Server

Real-time SNN inference server for streaming spike events.

  • SNNServer — Accept spike events over WebSocket or TCP, run inference on a loaded model, stream output spikes back. Supports batched and single-event modes.

Designed for closed-loop BCI, robotic control, and real-time neural decoding.

Python
from sc_neurocore.serve import SNNServer

server = SNNServer(model=my_snn, port=8080)
server.run()

sc_neurocore.serve

Real-time SNN inference server for streaming spike events.

SpikeServer

Streaming SNN inference server.

Parameters

network : SCNetwork or Network The SNN to run. host : str Bind address (default '0.0.0.0'). port : int Listen port (default 8001).

Source code in src/sc_neurocore/serve/server.py
Python
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class SpikeServer:
    """Streaming SNN inference server.

    Parameters
    ----------
    network : SCNetwork or Network
        The SNN to run.
    host : str
        Bind address (default '0.0.0.0').
    port : int
        Listen port (default 8001).
    """

    def __init__(self, network: Any, host: str = "127.0.0.1", port: int = 8001) -> None:
        self.network = network
        self.host = host
        self.port = port
        self._timestep = 0
        self._lock = threading.Lock()
        self._server: HTTPServer | None = None

    def step(self, inputs: dict[str, list[float]]) -> dict[str, Any]:
        """Run one network timestep and return output spikes.

        Parameters
        ----------
        inputs : dict mapping input node name to value array

        Returns
        -------
        dict with 'outputs' (node->values) and 'timestep'
        """
        with self._lock:
            inp = {k: np.array(v) for k, v in inputs.items()}

            # SCNetwork (from NIR bridge)
            if hasattr(self.network, "step"):
                out = self.network.step(inp)
                self._timestep += 1
                return {
                    "outputs": {
                        k: v.tolist() if hasattr(v, "tolist") else v for k, v in out.items()
                    },
                    "timestep": self._timestep,
                }

            # Population-Projection Network
            if hasattr(self.network, "populations"):
                # Step all populations with provided currents
                results = {}
                for pop in self.network.populations:
                    currents = inp.get(pop.label, np.zeros(pop.n))
                    if isinstance(currents, list):  # pragma: no cover
                        currents = np.array(currents)
                    spikes = pop.step_all(currents[: pop.n])
                    results[pop.label] = spikes.tolist()
                self._timestep += 1
                return {"outputs": results, "timestep": self._timestep}

            raise TypeError(f"Unsupported network type: {type(self.network).__name__}")

    def start(self, blocking: bool = True) -> None:
        """Start the HTTP server.

        Parameters
        ----------
        blocking : bool
            If True (default), blocks until server is shut down.
            If False, runs in a background thread.
        """
        server_ref = self

        class Handler(BaseHTTPRequestHandler):
            def do_POST(self) -> None:
                if self.path == "/step":
                    length = int(self.headers.get("Content-Length", 0))
                    body = self.rfile.read(length)
                    try:
                        data = json.loads(body)
                        inputs = data.get("inputs", {})
                        result = server_ref.step(inputs)
                        self._respond(200, result)
                    except Exception as e:
                        self._respond(400, {"error": str(e)})
                elif self.path == "/reset":
                    server_ref._timestep = 0
                    if hasattr(server_ref.network, "reset"):
                        server_ref.network.reset()
                    self._respond(200, {"status": "reset", "timestep": 0})
                elif self.path == "/info":
                    self._respond(
                        200,
                        {
                            "timestep": server_ref._timestep,
                            "type": type(server_ref.network).__name__,
                        },
                    )
                else:
                    self._respond(404, {"error": "Not found. Use /step, /reset, /info"})

            def do_GET(self) -> None:
                if self.path == "/info":
                    self._respond(
                        200,
                        {
                            "timestep": server_ref._timestep,
                            "type": type(server_ref.network).__name__,
                        },
                    )
                elif self.path == "/health":
                    self._respond(200, {"status": "ok"})
                else:
                    self._respond(404, {"error": "Not found"})

            def _respond(self, code: int, data: dict[str, Any]) -> None:
                self.send_response(code)
                self.send_header("Content-Type", "application/json")
                self.end_headers()
                self.wfile.write(json.dumps(data).encode("utf-8"))

            def log_message(self, format: str, *args: Any) -> None:
                pass  # suppress default logging

        self._server = HTTPServer((self.host, self.port), Handler)

        if blocking:  # pragma: no cover
            print(f"SC-NeuroCore inference server on {self.host}:{self.port}")
            print("Endpoints: POST /step, POST /reset, GET /info, GET /health")
            self._server.serve_forever()
        else:
            thread = threading.Thread(target=self._server.serve_forever, daemon=True)
            thread.start()

    def stop(self) -> None:
        """Shut down the server."""
        if self._server:
            self._server.shutdown()

step(inputs)

Run one network timestep and return output spikes.

Parameters

inputs : dict mapping input node name to value array

Returns

dict with 'outputs' (node->values) and 'timestep'

Source code in src/sc_neurocore/serve/server.py
Python
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def step(self, inputs: dict[str, list[float]]) -> dict[str, Any]:
    """Run one network timestep and return output spikes.

    Parameters
    ----------
    inputs : dict mapping input node name to value array

    Returns
    -------
    dict with 'outputs' (node->values) and 'timestep'
    """
    with self._lock:
        inp = {k: np.array(v) for k, v in inputs.items()}

        # SCNetwork (from NIR bridge)
        if hasattr(self.network, "step"):
            out = self.network.step(inp)
            self._timestep += 1
            return {
                "outputs": {
                    k: v.tolist() if hasattr(v, "tolist") else v for k, v in out.items()
                },
                "timestep": self._timestep,
            }

        # Population-Projection Network
        if hasattr(self.network, "populations"):
            # Step all populations with provided currents
            results = {}
            for pop in self.network.populations:
                currents = inp.get(pop.label, np.zeros(pop.n))
                if isinstance(currents, list):  # pragma: no cover
                    currents = np.array(currents)
                spikes = pop.step_all(currents[: pop.n])
                results[pop.label] = spikes.tolist()
            self._timestep += 1
            return {"outputs": results, "timestep": self._timestep}

        raise TypeError(f"Unsupported network type: {type(self.network).__name__}")

start(blocking=True)

Start the HTTP server.

Parameters

blocking : bool If True (default), blocks until server is shut down. If False, runs in a background thread.

Source code in src/sc_neurocore/serve/server.py
Python
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def start(self, blocking: bool = True) -> None:
    """Start the HTTP server.

    Parameters
    ----------
    blocking : bool
        If True (default), blocks until server is shut down.
        If False, runs in a background thread.
    """
    server_ref = self

    class Handler(BaseHTTPRequestHandler):
        def do_POST(self) -> None:
            if self.path == "/step":
                length = int(self.headers.get("Content-Length", 0))
                body = self.rfile.read(length)
                try:
                    data = json.loads(body)
                    inputs = data.get("inputs", {})
                    result = server_ref.step(inputs)
                    self._respond(200, result)
                except Exception as e:
                    self._respond(400, {"error": str(e)})
            elif self.path == "/reset":
                server_ref._timestep = 0
                if hasattr(server_ref.network, "reset"):
                    server_ref.network.reset()
                self._respond(200, {"status": "reset", "timestep": 0})
            elif self.path == "/info":
                self._respond(
                    200,
                    {
                        "timestep": server_ref._timestep,
                        "type": type(server_ref.network).__name__,
                    },
                )
            else:
                self._respond(404, {"error": "Not found. Use /step, /reset, /info"})

        def do_GET(self) -> None:
            if self.path == "/info":
                self._respond(
                    200,
                    {
                        "timestep": server_ref._timestep,
                        "type": type(server_ref.network).__name__,
                    },
                )
            elif self.path == "/health":
                self._respond(200, {"status": "ok"})
            else:
                self._respond(404, {"error": "Not found"})

        def _respond(self, code: int, data: dict[str, Any]) -> None:
            self.send_response(code)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(json.dumps(data).encode("utf-8"))

        def log_message(self, format: str, *args: Any) -> None:
            pass  # suppress default logging

    self._server = HTTPServer((self.host, self.port), Handler)

    if blocking:  # pragma: no cover
        print(f"SC-NeuroCore inference server on {self.host}:{self.port}")
        print("Endpoints: POST /step, POST /reset, GET /info, GET /health")
        self._server.serve_forever()
    else:
        thread = threading.Thread(target=self._server.serve_forever, daemon=True)
        thread.start()

stop()

Shut down the server.

Source code in src/sc_neurocore/serve/server.py
Python
168
169
170
171
def stop(self) -> None:
    """Shut down the server."""
    if self._server:
        self._server.shutdown()