Skip to content

Streaming Server

Real-time SNN inference server for streaming spike events.

  • SNNServer — Accept spike events over WebSocket or TCP, run inference on a loaded model, stream output spikes back. Supports batched and single-event modes.

Designed for closed-loop BCI, robotic control, and real-time neural decoding.

from sc_neurocore.serve import SNNServer

server = SNNServer(model=my_snn, port=8080)
server.run()

sc_neurocore.serve

Real-time SNN inference server for streaming spike events.

SpikeServer

Streaming SNN inference server.

Parameters

network : SCNetwork or Network The SNN to run. host : str Bind address (default '0.0.0.0'). port : int Listen port (default 8001).

Source code in src/sc_neurocore/serve/server.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class SpikeServer:
    """Streaming SNN inference server.

    Parameters
    ----------
    network : SCNetwork or Network
        The SNN to run.
    host : str
        Bind address (default '0.0.0.0').
    port : int
        Listen port (default 8001).
    """

    def __init__(self, network, host: str = "127.0.0.1", port: int = 8001):
        self.network = network
        self.host = host
        self.port = port
        self._timestep = 0
        self._lock = threading.Lock()
        self._server = None

    def step(self, inputs: dict[str, list[float]]) -> dict:
        """Run one network timestep and return output spikes.

        Parameters
        ----------
        inputs : dict mapping input node name to value array

        Returns
        -------
        dict with 'outputs' (node->values) and 'timestep'
        """
        with self._lock:
            inp = {k: np.array(v) for k, v in inputs.items()}

            # SCNetwork (from NIR bridge)
            if hasattr(self.network, "step"):
                out = self.network.step(inp)
                self._timestep += 1
                return {
                    "outputs": {
                        k: v.tolist() if hasattr(v, "tolist") else v for k, v in out.items()
                    },
                    "timestep": self._timestep,
                }

            # Population-Projection Network
            if hasattr(self.network, "populations"):
                # Step all populations with provided currents
                results = {}
                for pop in self.network.populations:
                    currents = inp.get(pop.label, np.zeros(pop.n))
                    if isinstance(currents, list):  # pragma: no cover
                        currents = np.array(currents)
                    spikes = pop.step_all(currents[: pop.n])
                    results[pop.label] = spikes.tolist()
                self._timestep += 1
                return {"outputs": results, "timestep": self._timestep}

            raise TypeError(f"Unsupported network type: {type(self.network).__name__}")

    def start(self, blocking: bool = True):
        """Start the HTTP server.

        Parameters
        ----------
        blocking : bool
            If True (default), blocks until server is shut down.
            If False, runs in a background thread.
        """
        server_ref = self

        class Handler(BaseHTTPRequestHandler):
            def do_POST(self):
                if self.path == "/step":
                    length = int(self.headers.get("Content-Length", 0))
                    body = self.rfile.read(length)
                    try:
                        data = json.loads(body)
                        inputs = data.get("inputs", {})
                        result = server_ref.step(inputs)
                        self._respond(200, result)
                    except Exception as e:
                        self._respond(400, {"error": str(e)})
                elif self.path == "/reset":
                    server_ref._timestep = 0
                    if hasattr(server_ref.network, "reset"):
                        server_ref.network.reset()
                    self._respond(200, {"status": "reset", "timestep": 0})
                elif self.path == "/info":
                    self._respond(
                        200,
                        {
                            "timestep": server_ref._timestep,
                            "type": type(server_ref.network).__name__,
                        },
                    )
                else:
                    self._respond(404, {"error": "Not found. Use /step, /reset, /info"})

            def do_GET(self):
                if self.path == "/info":
                    self._respond(
                        200,
                        {
                            "timestep": server_ref._timestep,
                            "type": type(server_ref.network).__name__,
                        },
                    )
                elif self.path == "/health":
                    self._respond(200, {"status": "ok"})
                else:
                    self._respond(404, {"error": "Not found"})

            def _respond(self, code, data):
                self.send_response(code)
                self.send_header("Content-Type", "application/json")
                self.end_headers()
                self.wfile.write(json.dumps(data).encode("utf-8"))

            def log_message(self, format, *args):
                pass  # suppress default logging

        self._server = HTTPServer((self.host, self.port), Handler)

        if blocking:  # pragma: no cover
            print(f"SC-NeuroCore inference server on {self.host}:{self.port}")
            print("Endpoints: POST /step, POST /reset, GET /info, GET /health")
            self._server.serve_forever()
        else:
            thread = threading.Thread(target=self._server.serve_forever, daemon=True)
            thread.start()

    def stop(self):
        """Shut down the server."""
        if self._server:
            self._server.shutdown()

step(inputs)

Run one network timestep and return output spikes.

Parameters

inputs : dict mapping input node name to value array

Returns

dict with 'outputs' (node->values) and 'timestep'

Source code in src/sc_neurocore/serve/server.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def step(self, inputs: dict[str, list[float]]) -> dict:
    """Run one network timestep and return output spikes.

    Parameters
    ----------
    inputs : dict mapping input node name to value array

    Returns
    -------
    dict with 'outputs' (node->values) and 'timestep'
    """
    with self._lock:
        inp = {k: np.array(v) for k, v in inputs.items()}

        # SCNetwork (from NIR bridge)
        if hasattr(self.network, "step"):
            out = self.network.step(inp)
            self._timestep += 1
            return {
                "outputs": {
                    k: v.tolist() if hasattr(v, "tolist") else v for k, v in out.items()
                },
                "timestep": self._timestep,
            }

        # Population-Projection Network
        if hasattr(self.network, "populations"):
            # Step all populations with provided currents
            results = {}
            for pop in self.network.populations:
                currents = inp.get(pop.label, np.zeros(pop.n))
                if isinstance(currents, list):  # pragma: no cover
                    currents = np.array(currents)
                spikes = pop.step_all(currents[: pop.n])
                results[pop.label] = spikes.tolist()
            self._timestep += 1
            return {"outputs": results, "timestep": self._timestep}

        raise TypeError(f"Unsupported network type: {type(self.network).__name__}")

start(blocking=True)

Start the HTTP server.

Parameters

blocking : bool If True (default), blocks until server is shut down. If False, runs in a background thread.

Source code in src/sc_neurocore/serve/server.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def start(self, blocking: bool = True):
    """Start the HTTP server.

    Parameters
    ----------
    blocking : bool
        If True (default), blocks until server is shut down.
        If False, runs in a background thread.
    """
    server_ref = self

    class Handler(BaseHTTPRequestHandler):
        def do_POST(self):
            if self.path == "/step":
                length = int(self.headers.get("Content-Length", 0))
                body = self.rfile.read(length)
                try:
                    data = json.loads(body)
                    inputs = data.get("inputs", {})
                    result = server_ref.step(inputs)
                    self._respond(200, result)
                except Exception as e:
                    self._respond(400, {"error": str(e)})
            elif self.path == "/reset":
                server_ref._timestep = 0
                if hasattr(server_ref.network, "reset"):
                    server_ref.network.reset()
                self._respond(200, {"status": "reset", "timestep": 0})
            elif self.path == "/info":
                self._respond(
                    200,
                    {
                        "timestep": server_ref._timestep,
                        "type": type(server_ref.network).__name__,
                    },
                )
            else:
                self._respond(404, {"error": "Not found. Use /step, /reset, /info"})

        def do_GET(self):
            if self.path == "/info":
                self._respond(
                    200,
                    {
                        "timestep": server_ref._timestep,
                        "type": type(server_ref.network).__name__,
                    },
                )
            elif self.path == "/health":
                self._respond(200, {"status": "ok"})
            else:
                self._respond(404, {"error": "Not found"})

        def _respond(self, code, data):
            self.send_response(code)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(json.dumps(data).encode("utf-8"))

        def log_message(self, format, *args):
            pass  # suppress default logging

    self._server = HTTPServer((self.host, self.port), Handler)

    if blocking:  # pragma: no cover
        print(f"SC-NeuroCore inference server on {self.host}:{self.port}")
        print("Endpoints: POST /step, POST /reset, GET /info, GET /health")
        self._server.serve_forever()
    else:
        thread = threading.Thread(target=self._server.serve_forever, daemon=True)
        thread.start()

stop()

Shut down the server.

Source code in src/sc_neurocore/serve/server.py
166
167
168
169
def stop(self):
    """Shut down the server."""
    if self._server:
        self._server.shutdown()