Skip to content

API reference

The public API is the synapse_channel package surface. The reference below is generated from the source docstrings.

synapse_channel

SYNAPSE CHANNEL — local-first multi-agent coordination bus.

A small WebSocket fabric that lets several agents share presence, claim and release units of work, chat, and advertise resources through one authoritative hub. The pieces compose: :class:~synapse_channel.hub.SynapseHub routes, :class:~synapse_channel.client.SynapseAgent connects, and :class:~synapse_channel.llm_worker.SynapseLLMWorker answers on-channel through a pluggable :mod:~synapse_channel.chat_backends backend. The synapse console command (see :mod:synapse_channel.cli) drives all of it.

DEFAULT_HUB_URI = 'ws://localhost:8876' module-attribute

Default hub URI; matches the hub's default bind port.

PRIORITY_SENDERS = frozenset({'CEO'}) module-attribute

Senders whose message wakes a directed-only waiter even on a broadcast.

The CEO command session directs the fleet; a broadcast from it is never merely routine peer chatter, so it must reach a quiet waiter promptly.

TokenAuthenticator

Validates a shared-secret token, optionally bound to agent names.

Parameters:

Name Type Description Default
tokens Mapping[str, Iterable[str]] or Iterable[str]

Either a mapping of token -> permitted agent names (an empty name set permits any agent), or a plain iterable of tokens that each permit any agent. Empty-string tokens are dropped.

required
Notes

An authenticator constructed with no usable tokens denies every connection; pass None to :class:~synapse_channel.hub.SynapseHub to leave the hub open instead.

is_empty property

Whether no usable token is configured (so every connection is denied).

authenticate(token, agent)

Check a presented token for a connecting agent.

Parameters:

Name Type Description Default
token str

The secret the agent presented; an empty value is always refused.

required
agent str

The agent name the connection claims, checked against any name binding on the matched token.

required

Returns:

Type Description
tuple[bool, str]

(True, message) when the token is valid and permits the agent, otherwise (False, reason).

CapabilityCard dataclass

A small, A2A-shaped description an agent advertises about itself.

Attributes:

Name Type Description
agent str

Name of the advertising agent.

description str

Free-form summary of what the agent does.

skills tuple[str, ...]

Capability tags the agent claims (free-form).

task_classes tuple[str, ...]

Routing classes the agent can take (e.g. chat, rule, reason), used to pick a worker for a task.

model str

Optional model identifier backing the agent.

meta dict[str, Any]

Arbitrary descriptive metadata.

advertised_at float

Wall-clock time, in seconds, when the card was last refreshed.

as_dict()

Return a JSON-serialisable snapshot of this card.

CapabilityRegistry

One capability card per agent, exposed as a queryable manifest.

The registry is single-threaded and synchronous; the hub owns one instance. Cards are kept fresh by re-advertising and are dropped on disconnect or when they pass the soft TTL.

Parameters:

Name Type Description Default
ttl_seconds float

Liveness window after which an un-refreshed card is expired. Defaults to :data:DEFAULT_CARD_TTL_SECONDS.

DEFAULT_CARD_TTL_SECONDS

advertise(agent, *, description='', skills=(), task_classes=(), model='', meta=None, now=None)

Store or refresh an agent's capability card.

Parameters:

Name Type Description Default
agent str

Name of the advertising agent.

required
description str

Free-form summary.

''
skills Iterable[str]

Capability tags; stripped, de-duplicated, blanks dropped.

()
task_classes Iterable[str]

Routing classes; stripped, de-duplicated, blanks dropped.

()
model str

Backing model identifier.

''
meta dict[str, Any] or None

Descriptive metadata; None becomes an empty mapping.

None
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
CapabilityCard

The stored card.

forget(agent)

Drop an agent's card, e.g. when it disconnects.

get(agent)

Return an agent's card, or None when it has none.

expire(now=None)

Drop every card not refreshed within the TTL of now.

manifest(now=None)

Return all live cards as dicts, sorted by agent name.

Parameters:

Name Type Description Default
now float or None

Override for the current wall-clock time used to expire stale cards.

None

Returns:

Type Description
list[dict[str, Any]]

One card mapping per live agent.

for_task_class(task_class, now=None)

Return the agents that advertise a given task class, sorted by name.

Parameters:

Name Type Description Default
task_class str

The routing class to match against each card's task_classes.

required
now float or None

Override for the current wall-clock time used to expire stale cards.

None

Returns:

Type Description
list[str]

Names of live agents that can take the task class.

ChatBackend

Bases: Protocol

Structural type for anything that generates a reply from two prompts.

generate(*, system_prompt, user_prompt)

Return a reply for the given system and user prompts.

OpenAIChatClient

Backend for any OpenAI-compatible /v1/chat/completions endpoint.

Parameters:

Name Type Description Default
api_key str

Bearer token. Local servers (e.g. Ollama) accept any non-empty value.

required
model str

Model identifier passed in the request body.

required
base_url str

Base URL of the OpenAI-compatible API; a trailing slash is stripped.

required
timeout_seconds float

Per-request timeout, clamped up to 3.0 seconds.

required

generate(*, system_prompt, user_prompt)

Request a completion and return the sanitised reply text.

Parameters:

Name Type Description Default
system_prompt str

System role content steering the model.

required
user_prompt str

User role content the model responds to.

required

Returns:

Type Description
str

The assistant message content, whitespace-collapsed and truncated.

Raises:

Type Description
RuntimeError

On an HTTP error status, a connection failure, or a response whose shape does not contain the expected completion content.

RuleBasedClient

Deterministic offline backend that acknowledges receipt.

The reply carries no sender prefix: the wire envelope already records the author, so the hub and every reader render the name once.

generate(*, system_prompt, user_prompt)

Return a fixed acknowledgement, ignoring both prompts.

Parameters:

Name Type Description Default
system_prompt str

Unused; present to satisfy :class:ChatBackend.

required
user_prompt str

Unused; present to satisfy :class:ChatBackend.

required

Returns:

Type Description
str

A constant on-channel acknowledgement.

SynapseAgent

An async client that maintains one connection to the Synapse hub.

Parameters:

Name Type Description Default
name str

Unique agent name presented to the hub.

required
on_message_callback MessageCallback or None

Coroutine called with every decoded inbound message. Self-originated chat echoes are filtered out before the callback runs.

None
uri str

Hub WebSocket URI. Defaults to :data:DEFAULT_HUB_URI.

DEFAULT_HUB_URI
heartbeat_interval float

Seconds between keepalive heartbeats, clamped up to :data:MINIMUM_HEARTBEAT_INTERVAL. Defaults to 20.0.

20.0
verbose bool

When True, connection lifecycle notes are printed. Defaults to True.

True
token str or None

Shared-secret token presented on the registration message when the hub requires authentication. None sends no token (the default for an open, loopback hub).

None

connect() async

Open the connection and run the inbound listener until it closes.

Sends the registration heartbeat, starts the keepalive loop, then dispatches each inbound message to the callback. Connection failures are reported (when verbose) and end the loop; the heartbeat task is always cancelled on exit.

wait_until_ready(timeout=5.0) async

Wait until the hub's welcome message has been received.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait, floored at 0.1. Defaults to 5.0.

5.0

Returns:

Type Description
bool

True if the welcome arrived in time, False on timeout.

send_message(msg_type, *, target='all', payload='', **extra) async

Serialise and send one message envelope to the hub.

Parameters:

Name Type Description Default
msg_type str

One of the :class:~synapse_channel.protocol.MessageType constants.

required
target str

Recipient agent name, or "all". Defaults to "all".

'all'
payload str

Free-form text body.

''
**extra Any

Additional protocol fields merged into the envelope.

{}

chat(payload, *, target='all', priority=False) async

Send a chat message to the room or a single agent.

Parameters:

Name Type Description Default
payload str

Message text.

required
target str

Recipient agent name, or "all". Defaults to "all".

'all'
priority bool

Mark the message as priority so it wakes even directed-only waiters (use sparingly — for announcements that genuinely must reach everyone).

False

claim(task_id, note='', ttl_seconds=None, *, worktree='', paths=(), idem_key=None) async

Request a scoped lease on a task.

Parameters:

Name Type Description Default
task_id str

Task identifier; surrounding whitespace is stripped.

required
note str

Human-readable context stored with the claim.

''
ttl_seconds float or None

Requested lease duration; None lets the hub apply its default.

None
worktree str

Worktree label; claims in different worktrees never contend for files.

''
paths tuple[str, ...] or list[str]

Declared file/directory paths the claim intends to touch; empty claims the whole worktree.

()
idem_key str or None

Idempotency key; reuse the same key when retrying after a reconnect so the hub replays the original result instead of claiming twice.

None

release(task_id, *, epoch=None, idem_key=None) async

Release a task lease.

Parameters:

Name Type Description Default
task_id str

Task identifier; surrounding whitespace is stripped.

required
epoch int or None

Expected lease generation; when given, the hub refuses the release if the lease has since been superseded.

None
idem_key str or None

Idempotency key; reuse the same key when retrying after a reconnect so the hub replays the original result instead of releasing twice.

None

update_task(task_id, *, status=None, note=None, data_ref=None, epoch=None, expected_version=None, idem_key=None) async

Update an owned task's status, note, or artefact reference.

Parameters:

Name Type Description Default
task_id str

Task identifier; surrounding whitespace is stripped.

required
status str or None

New lifecycle status (see :mod:synapse_channel.lifecycle); the hub rejects an illegal transition.

None
note str or None

Replacement note.

None
data_ref str or None

Replacement artefact reference.

None
epoch int or None

Expected lease generation; a stale epoch is refused.

None
expected_version int or None

Expected field version for compare-and-swap; a mismatch is refused.

None
idem_key str or None

Idempotency key for safe retries after a reconnect.

None

handoff(task_id, to_agent, *, note=None, epoch=None, idem_key=None) async

Hand an owned task to another agent in one atomic step.

Transfers ownership directly, with no release/re-claim window, carrying the task's scope, status, and artefact reference. The recipient must be online; the hub records the move on the shared blackboard.

Parameters:

Name Type Description Default
task_id str

Identifier of the owned task; whitespace is stripped.

required
to_agent str

The agent to receive the task; whitespace is stripped.

required
note str or None

Replacement note for the moved claim; the existing note is kept when None.

None
epoch int or None

Expected lease generation; a stale epoch is refused.

None
idem_key str or None

Idempotency key for a safe retry after a reconnect.

None

save_checkpoint(task_id, checkpoint, *, epoch=None, idem_key=None) async

Save a resume checkpoint on an owned task.

The checkpoint is durable and survives lease expiry: if this agent's lease lapses, the next agent to claim the task inherits it (and receives it in the claim grant) instead of restarting.

Parameters:

Name Type Description Default
task_id str

Identifier of the owned task; whitespace is stripped.

required
checkpoint str

Opaque resume token to store.

required
epoch int or None

Expected lease generation; a stale epoch is refused.

None
idem_key str or None

Idempotency key for a safe retry after a reconnect.

None

request_resume(since=0) async

Ask the hub for every chat message after a cursor.

Use after a reconnect to catch up on exactly the messages missed.

Parameters:

Name Type Description Default
since int

The last chat msg_id already seen; the hub returns messages numbered above it. Defaults to 0 (the full history).

0

request_state() async

Ask the hub for a full state snapshot.

request_who() async

Ask the hub for the list of online agents.

request_history(limit=20) async

Ask the hub for recent chat history.

Parameters:

Name Type Description Default
limit int or None

Number of recent messages to fetch (floored at 1), or None for the full history. Defaults to 20.

20

request_wait(task_id) async

Register an advisory wait for a task another agent holds.

The hub refuses the wait if it would close a hold-and-wait deadlock cycle. The wait is advisory: retry the claim once the holder releases.

Parameters:

Name Type Description Default
task_id str

Identifier of the held task to wait for; whitespace is stripped.

required

post_task(task_id, title, *, description='', depends_on=(), suggested_owner='') async

Declare or re-declare a task on the shared plan (an upsert).

This is the planning surface, distinct from :meth:claim (the lease on doing the work). Re-posting the same id refines the declaration.

Parameters:

Name Type Description Default
task_id str

Stable identifier, shared with any claim taken on the task.

required
title str

Short human-readable name of the work.

required
description str

Longer description or acceptance notes.

''
depends_on tuple[str, ...] or list[str]

Prerequisite task ids; the hub refuses dependencies that form a cycle.

()
suggested_owner str

Advisory proposed owner.

''

update_ledger_task(task_id, *, status=None, suggested_owner=None) async

Change a plan task's planning status or suggested owner.

Parameters:

Name Type Description Default
task_id str

Identifier of the task to update.

required
status str or None

New planning status (open/in_progress/blocked/done/ cancelled); an unknown status is refused.

None
suggested_owner str or None

Replacement advisory owner ("" clears it).

None

post_progress(task_id, text, *, kind='note') async

Append a structured progress note to the progress ledger.

Parameters:

Name Type Description Default
task_id str

Task the note concerns; "" for a board-wide note.

required
text str

Body of the note.

required
kind str

One of note/blocked/assessment. Defaults to "note".

'note'

request_board() async

Ask the hub for a snapshot of the shared blackboard.

advertise(*, description='', skills=(), task_classes=(), model='', meta=None) async

Advertise this agent's capability card to the hub.

The card describes what the agent can do — its skills and the task classes it can take — so other agents can discover it and a router can pick it by task class. Re-advertising refreshes the card.

Parameters:

Name Type Description Default
description str

Free-form summary of what the agent does.

''
skills tuple[str, ...] or list[str]

Capability tags the agent claims.

()
task_classes tuple[str, ...] or list[str]

Routing classes the agent can take.

()
model str

Backing model identifier.

''
meta dict[str, Any] or None

Descriptive metadata.

None

request_manifest() async

Ask the hub for the capability manifest of all advertised agents.

start()

Run :meth:connect to completion on a fresh event loop.

Intended as a blocking entry point for scripts. Ctrl+C is caught and reported instead of raising.

SynapseHub

Routing core that maintains presence, history, and coordination state.

Parameters:

Name Type Description Default
default_ttl_seconds float

Lease TTL passed to the underlying :class:SynapseState. Defaults to 3600.0.

3600.0
hub_id str or None

Stable hub identifier stamped on outgoing system messages. When None a random "syn-XXXXXXXX" id is generated.

None
journal EventStore or None

When given, authoritative mutations are appended to this durable log and the hub's state is rebuilt from it on construction, so a restart resumes live leases and history instead of an empty registry. When None the hub is purely in-memory.

None
rate_limiter RateLimiter or None

When given, non-heartbeat messages from an agent over its limit are refused, so one runaway agent cannot swamp the single hub. None disables rate limiting.

None
max_history int

Maximum chat messages retained in memory; the oldest are dropped beyond this bound so history cannot grow without limit. The durable log (when a journal is attached) still records every message. Defaults to :data:DEFAULT_MAX_HISTORY.

DEFAULT_MAX_HISTORY
relay_log str or Path or None

When given, every broadcast message is also mirrored to this newline- delimited log in the compact lite format (see :func:~synapse_channel.relay.encode_lite), so a token-budgeted agent can observe the channel by tailing a file instead of holding a socket. None disables the mirror.

None
relay_max_lines int

Upper bound on the relay log: it is trimmed back to its last this-many lines once it grows that far past the bound, so the mirror cannot grow without limit. Defaults to :data:DEFAULT_RELAY_MAX_LINES.

DEFAULT_RELAY_MAX_LINES
max_progress int

Maximum progress notes retained on the shared blackboard; the oldest are dropped beyond this bound. The durable log (when attached) still records every note. Defaults to :data:~synapse_channel.ledger.DEFAULT_MAX_PROGRESS.

DEFAULT_MAX_PROGRESS
authenticator TokenAuthenticator or None

When given, a connecting agent must present a valid shared-secret token on its first message or the hub refuses and closes the socket. None leaves the hub open, which is the right default for a loopback bind.

None

online_agents()

Return the sorted names of currently registered agents.

handle_message(raw_message, websocket) async

Parse and route one inbound frame.

Parameters:

Name Type Description Default
raw_message str or bytes

The raw frame received from a client socket.

required
websocket Any

The socket the frame arrived on.

required

register(websocket) async

Record a new socket and send it the welcome message.

unregister(websocket) async

Drop a socket, releasing its agent name and broadcasting departure.

handler(websocket) async

Serve one client connection from registration to disconnect.

serve(host=DEFAULT_HOST, port=DEFAULT_PORT) async

Run the hub's WebSocket server until cancelled.

Parameters:

Name Type Description Default
host str

Bind address. Defaults to :data:DEFAULT_HOST.

DEFAULT_HOST
port int

Bind port. Defaults to :data:DEFAULT_PORT.

DEFAULT_PORT

Blackboard

The team's shared plan: a task ledger plus an append-only progress stream.

The board is single-threaded and synchronous; the hub owns one instance and mutates it from its event loop. Posting a task is an upsert — the same id re-declares the task and replaces its planning fields — so a planner can refine the plan idempotently.

Parameters:

Name Type Description Default
max_progress int

Maximum progress notes retained; the oldest are dropped beyond this bound so the stream cannot grow without limit. Clamped up to 1. Defaults to :data:DEFAULT_MAX_PROGRESS.

DEFAULT_MAX_PROGRESS

post_task(*, task_id, title, author, description='', depends_on=(), suggested_owner='', now=None)

Declare or re-declare a task on the plan (an upsert).

Parameters:

Name Type Description Default
task_id str

Identifier and short name; both are required (whitespace-stripped).

required
title str

Identifier and short name; both are required (whitespace-stripped).

required
author str

Agent declaring the task; recorded as created_by on first post.

required
description str

Longer description.

''
depends_on tuple[str, ...] or list[str]

Prerequisite task ids; self-references and duplicates are dropped.

()
suggested_owner str

Advisory proposed owner.

''
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
tuple[bool, str]

(True, message) on success, (False, reason) when the id or title is missing or the dependencies would form a cycle.

update_task(task_id, *, status=None, suggested_owner=None, now=None)

Change a declared task's planning status or suggested owner.

Parameters:

Name Type Description Default
task_id str

Identifier of the task to update.

required
status str or None

New planning status; must be in :data:LEDGER_TASK_STATUSES.

None
suggested_owner str or None

Replacement advisory owner ("" clears it).

None
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
tuple[bool, str]

(True, message) on success, (False, reason) when the task is unknown or the status is not a recognised planning status.

post_progress(*, task_id, author, text, kind='note', now=None)

Append a structured progress note, dropping the oldest past the bound.

Parameters:

Name Type Description Default
task_id str

Task the note concerns; "" for a board-wide note.

required
author str

Agent posting the note.

required
text str

Body of the note.

required
kind str

One of :data:PROGRESS_KINDS. Defaults to "note".

'note'
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
tuple[bool, ProgressNote or str]

(True, note) on success, (False, reason) for an unknown kind.

note(*, task_id, author, text, now=None)

Append a plain note-kind progress entry, returning it directly.

A convenience over :meth:post_progress for callers that always use the note kind (so the kind cannot be rejected) and want the appended :class:ProgressNote without unpacking a result tuple.

Parameters:

Name Type Description Default
task_id str

Task the note concerns; "" for a board-wide note.

required
author str

Agent posting the note.

required
text str

Body of the note.

required
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
ProgressNote

The appended note.

blocking_dependencies(task_id)

Return the unmet dependencies of a task, in declaration order.

A dependency is unmet when the prerequisite is absent from the board or has not reached a terminal status. Returns an empty list for an unknown task.

Parameters:

Name Type Description Default
task_id str

Identifier of the task to inspect.

required

Returns:

Type Description
list[str]

Task ids that still block this task.

ready_tasks()

Return open tasks whose every dependency has reached a terminal status.

Returns:

Type Description
list[LedgerTask]

Tasks with planning status open and no blocking dependency, sorted by task_id.

snapshot()

Return a consistent view of the plan and the recent progress stream.

Returns:

Type Description
dict[str, Any]

Mapping with tasks (sorted by id), ready (ready task ids), and progress (the retained notes in order).

LedgerTask dataclass

A declared unit of work on the shared plan.

Attributes:

Name Type Description
task_id str

Stable identifier, shared with any claim taken on the task.

title str

Short human-readable name of the work.

description str

Optional longer description or acceptance notes.

depends_on tuple[str, ...]

Task ids that must reach a terminal status before this task is ready.

status str

Coarse planning status from :data:LEDGER_TASK_STATUSES.

suggested_owner str

Optional agent name proposed to take the task; advisory only.

created_by str

Agent that first declared the task.

created_at float

Wall-clock seconds when the task was first declared.

updated_at float

Wall-clock seconds when the task was last changed.

as_dict()

Return a JSON-serialisable snapshot of this task.

ProgressNote dataclass

One structured entry in the append-only progress ledger.

Attributes:

Name Type Description
task_id str

Task the note concerns; "" for a board-wide note.

author str

Agent that posted the note.

kind str

One of :data:PROGRESS_KINDS.

text str

Free-form body of the note.

posted_at float

Wall-clock seconds when the note was posted.

as_dict()

Return a JSON-serialisable snapshot of this note.

TaskStatus

The legal status values for a claimed task.

CLAIMED is the entry state stamped when a lease is granted; DONE and FAILED are terminal. Values are the literal strings carried on the wire.

SynapseLLMWorker

A hub agent that answers addressed messages via a chat backend.

Parameters:

Name Type Description Default
name str

Agent name presented on the channel.

required
uri str

Hub URI. Defaults to :data:~synapse_channel.client.DEFAULT_HUB_URI.

DEFAULT_HUB_URI
provider str

Backend provider: ollama (default), openai, or rule.

'ollama'
model str

Model identifier for HTTP providers. Defaults to "llama3".

'llama3'
base_url str

OpenAI-compatible base URL. Defaults to the local Ollama endpoint.

DEFAULT_OLLAMA_BASE_URL
api_key_env str

Environment variable holding the API key. Defaults to "OPENAI_API_KEY".

'OPENAI_API_KEY'
max_context int

Number of recent messages retained for prompt context (floored at 2).

8
reply_target_mode str

"all" to answer the room or "sender" to answer privately.

'all'
min_reply_interval float

Minimum seconds between replies (floored at 0). Defaults to 0.7.

0.7
token str or None

Shared-secret token presented to a hub that requires authentication; None for an open hub.

None
task_classes tuple[str, ...] or list[str]

Routing classes this worker advertises on its capability card; defaults to ("chat",).

('chat',)
heavy_model str

Model used for the heavy tier when provider="tiered"; defaults to model when empty.

''

on_message(data) async

Filter an inbound message and queue it when a reply is warranted.

Parameters:

Name Type Description Default
data dict[str, Any]

A decoded inbound message envelope.

required

run() async

Connect, wait for the handshake, and run the worker loop.

The connection and worker tasks run concurrently; when either finishes the other is cancelled and any terminal error is reported.

EventStore

Append-only SQLite event log in WAL mode.

Parameters:

Name Type Description Default
path str or Path

Database file path. ":memory:" is accepted for ephemeral use, but only a file path survives a restart.

required

append(kind, payload, *, ts=None, durable=False)

Append one event to the log.

Parameters:

Name Type Description Default
kind str

Event kind tag.

required
payload dict[str, Any]

JSON-serialisable event body.

required
ts float or None

Event timestamp, in seconds; the system clock is used when None.

None
durable bool

When True the commit is synced at synchronous=FULL so it survives an OS crash; when False it commits at NORMAL (durable only against an application crash). Defaults to False.

False

read_all()

Return every event in insertion order.

Returns:

Type Description
list[StoredEvent]

All persisted events, ordered by ascending sequence number.

count()

Return the number of events currently stored.

close()

Close the underlying database connection.

__enter__()

Enter a context manager that closes the store on exit.

__exit__(exc_type, exc, tb)

Close the store when leaving the context.

MessageType

String constants for every Synapse message type.

The upper group is sent by agents to the hub; the lower group is emitted by the hub back to agents. Values are the literal strings that travel on the wire — never rename a value without migrating every peer.

TaskClass

The coarse routing classes a request can fall into.

TieredChatClient

A chat backend that routes :meth:generate to a per-class backend.

Parameters:

Name Type Description Default
backends Mapping[str, ChatBackend]

One backend per task class. The default_class must be present.

required
default_class str

Class used when the classifier picks one with no registered backend. Defaults to :attr:TaskClass.SLM.

SLM
classifier Callable[[str], str]

The prompt classifier; defaults to :func:classify. Injectable for tests.

classify

Raises:

Type Description
ValueError

If default_class has no backend in backends.

route(prompt)

Return the task class :func:classify assigns to prompt.

generate(*, system_prompt, user_prompt)

Classify user_prompt and delegate to the matching backend.

Parameters:

Name Type Description Default
system_prompt str

System prompt forwarded to the chosen backend.

required
user_prompt str

User prompt, both classified and forwarded.

required

Returns:

Type Description
str

The chosen backend's reply.

ResourceOffer dataclass

A capability that an agent advertises to the rest of the team.

Attributes:

Name Type Description
agent str

Name of the offering agent.

kind str

Category of the resource, e.g. llm, compute, fs, or memory.

name str

Concrete resource identifier, e.g. a model name or device handle.

capacity int

How many concurrent consumers the offer can serve (minimum 1).

meta dict[str, Any]

Arbitrary descriptive metadata about the offer.

offered_at float

Wall-clock time, in seconds, when the offer was last refreshed.

SynapseState

Authoritative registry of presence, claims, tasks, and resources.

The registry is single-threaded and synchronous; the hub owns one instance and mutates it from its event loop. Every mutating call refreshes the caller's heartbeat and lazily expires stale leases and offers, so liveness is maintained without a background timer.

Parameters:

Name Type Description Default
default_ttl_seconds float

Lease duration applied to a claim when the caller does not request an explicit TTL. Clamped up to :data:MINIMUM_TTL_SECONDS. Defaults to 3600.0.

3600.0

heartbeat(agent, now=None)

Record that agent is alive and expire anything now stale.

Parameters:

Name Type Description Default
agent str

Name of the agent reporting liveness.

required
now float or None

Override for the current wall-clock time, in seconds. When None the system clock is used. Primarily a testing seam.

None

claim(agent, task_id, note='', ttl_seconds=None, now=None, *, worktree=DEFAULT_WORKTREE, paths=())

Acquire or renew a scoped lease on a task.

An owner may freely renew its own live claim, and any agent may take over a task whose lease has expired. A live claim held by another agent blocks the request. Beyond the task id, a claim may declare a file scope (worktree + paths); the request is also refused when that scope contends with another agent's live claim, which is how the bus prevents two agents from editing the same files. Every successful claim or renewal is stamped with a fresh, strictly-increasing :attr:TaskClaim.epoch.

Parameters:

Name Type Description Default
agent str

Name of the agent attempting the claim.

required
task_id str

Identifier of the task; surrounding whitespace is stripped.

required
note str

Human-readable context stored with the claim.

''
ttl_seconds float or None

Requested lease duration, clamped up to :data:MINIMUM_TTL_SECONDS. None uses default_ttl_seconds.

None
now float or None

Override for the current wall-clock time, in seconds.

None
worktree str

Worktree label; claims in different worktrees never contend.

DEFAULT_WORKTREE
paths tuple[str, ...] or list[str]

Declared file/directory paths; empty claims the whole worktree.

()

Returns:

Type Description
tuple[bool, str]

(True, message) on success, (False, reason) when the task is missing an id, held by another agent, or its file scope overlaps another agent's live claim.

update_task(agent, task_id, *, status=None, note=None, data_ref=None, epoch=None, expected_version=None, now=None)

Update the status, note, or artefact reference of an owned task.

Only the claim owner may mutate it. Fields left as None are untouched; a non-empty status must be a legal lifecycle transition (see :func:synapse_channel.lifecycle.can_transition). When epoch is supplied it must match the claim's current epoch (lease guard), and when expected_version is supplied it must match the claim's current version (optimistic-concurrency guard against lost updates). A successful update bumps the version.

Parameters:

Name Type Description Default
agent str

Name of the agent issuing the update; must own the claim.

required
task_id str

Identifier of the task to update.

required
status str or None

New lifecycle status, applied only when truthy and the transition is legal.

None
note str or None

Replacement note; stripped before storage.

None
data_ref str or None

Replacement artefact reference; stripped before storage.

None
epoch int or None

Expected lease generation; when given and stale, the update is refused.

None
expected_version int or None

Expected field version; when given and mismatched, the update is refused so a stale writer cannot clobber a newer value.

None
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
tuple[bool, str]

(True, message) on success, (False, reason) when the task is unknown, owned by a different agent, carries a stale epoch or version, or requests an illegal status transition.

save_checkpoint(agent, task_id, checkpoint, *, epoch=None, now=None)

Save a resume token on an owned task so it can continue after expiry.

Only the owner may save, and the checkpoint persists with the claim: if the lease later expires, a new claimant of the same task inherits it.

Parameters:

Name Type Description Default
agent str

The owner saving the checkpoint.

required
task_id str

Identifier of the owned task; whitespace is stripped.

required
checkpoint str

Opaque resume token to store.

required
epoch int or None

Expected lease generation; a stale epoch is refused.

None
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
tuple[bool, str]

(True, message) on success, (False, reason) when the task is unknown, owned by another agent, or carries a stale epoch.

release(agent, task_id, now=None, *, epoch=None)

Release a task held by agent.

Parameters:

Name Type Description Default
agent str

Name of the agent releasing the claim; must be the owner.

required
task_id str

Identifier of the task; surrounding whitespace is stripped.

required
now float or None

Override for the current wall-clock time, in seconds.

None
epoch int or None

Expected lease generation; when given and stale, the release is refused so an agent cannot drop a lease that has since been superseded.

None

Returns:

Type Description
tuple[bool, str]

(True, message) on success, (False, reason) when the task id is empty, unclaimed, owned by another agent, or carries a stale epoch.

handoff(agent, task_id, to_agent, *, note=None, epoch=None, now=None)

Transfer an owned task to another agent in one atomic step.

Ownership moves directly from the holder to to_agent with no release/re-claim window in which a third agent could grab the task. The task keeps its file scope, status, and artefact reference (its working context) and is stamped with a fresh epoch and a full lease, so the previous owner's epoch becomes stale and cannot act on the moved task. The version counter resets for the new owner.

Parameters:

Name Type Description Default
agent str

The current owner requesting the handoff.

required
task_id str

Identifier of the task to hand off; whitespace is stripped.

required
to_agent str

The agent to receive the task; whitespace is stripped.

required
note str or None

Replacement note for the moved claim; the existing note is kept when None.

None
epoch int or None

Expected lease generation; a stale epoch is refused.

None
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
tuple[bool, str]

(True, message) on success, (False, reason) when the task is missing an id, unclaimed, owned by another agent, handed to its own owner, given no target, or carries a stale epoch.

offer_resource(agent, *, kind, name, capacity=1, meta=None, now=None)

Advertise a resource the agent can provide, keyed by agent/kind/name.

Re-offering the same triple refreshes the offer's liveness timestamp.

Parameters:

Name Type Description Default
agent str

Name of the offering agent.

required
kind str

Resource category, e.g. llm or compute.

required
name str

Concrete resource identifier.

required
capacity int

Concurrent-consumer capacity, clamped up to 1.

1
meta dict[str, Any] or None

Descriptive metadata; None becomes an empty mapping.

None
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
str

The registry key "{agent}:{kind}:{name}" of the stored offer.

query_resources(kind=None)

List currently offered resources, optionally filtered by kind.

Parameters:

Name Type Description Default
kind str or None

When given, only offers of this category are returned.

None

Returns:

Type Description
list[dict[str, Any]]

Offer mappings sorted by (agent, kind, name).

snapshot(now=None)

Return a consistent view of claims, agents, and resources.

Stale claims and offers are expired before the view is built, so the snapshot never reports leases that have already lapsed.

Parameters:

Name Type Description Default
now float or None

Override for the current wall-clock time, in seconds.

None

Returns:

Type Description
dict[str, Any]

Mapping with active_claims, agents, resources, and the generated_at timestamp.

TaskClaim dataclass

A lease held by one agent over a named unit of work.

The owner keeps the claim until it is explicitly released or its lease expires. While the lease is live, other agents are refused the same task.

Attributes:

Name Type Description
task_id str

Stable identifier of the claimed task.

owner str

Name of the agent currently holding the claim.

note str

Free-form human-readable context for the claim.

claimed_at float

Wall-clock time, in seconds, when the claim was last (re)acquired.

lease_expires_at float

Wall-clock time, in seconds, after which the claim auto-expires.

status str

Lifecycle marker: claimed, in_progress, blocked, or completed.

data_ref str

Optional pointer to produced artefacts (e.g. a memory key or file path).

worktree str

Worktree label the work happens in; claims in different worktrees never contend for files.

paths tuple[str, ...]

Declared file/directory paths the claim intends to touch; empty means the whole worktree.

epoch int

Strictly-increasing lease generation. A mutation carrying a stale epoch is rejected, so a paused/expired agent cannot act on a superseded claim.

version int

Optimistic-concurrency counter bumped on every field update, used for compare-and-swap so a stale update is rejected. Reset on (re)claim.

checkpoint str

Opaque resume token the owner saves so the work can continue from where it stopped. It survives lease expiry: a later claimant of the same task inherits the last checkpoint instead of restarting.

as_dict()

Return a JSON-serialisable snapshot of this claim.

Returns:

Type Description
dict[str, Any]

A mapping with the claim's public fields, safe to embed in a wire message or state snapshot.

Intervention dataclass

A single action the supervisor decided to take on a task.

Attributes:

Name Type Description
task_id str

The task the intervention concerns.

action str

What to do; currently always "reoffer".

reason str

Human-readable explanation, recorded with the re-offer.

SupervisorWorker

An on-channel agent that polls the board and re-offers stalled tasks.

Parameters:

Name Type Description Default
name str

Agent name presented on the channel. Defaults to "SUPERVISOR".

'SUPERVISOR'
uri str

Hub URI. Defaults to :data:~synapse_channel.client.DEFAULT_HUB_URI.

DEFAULT_HUB_URI
idle_seconds float

No-activity window passed to :func:detect_stalls.

DEFAULT_IDLE_SECONDS
interval float

Seconds between passes (floored at 1).

DEFAULT_INTERVAL_SECONDS
settle_seconds float

Pause after requesting the board to let the snapshot arrive.

DEFAULT_SETTLE_SECONDS
token str or None

Shared-secret token for a secured hub.

None
clock Callable[[], float]

Wall-clock source; injectable for deterministic tests.

time

on_message(data) async

Capture the latest board snapshot the hub sends back.

evaluate_and_apply() async

Run the stall policy on the latest board and apply each re-offer.

Returns:

Type Description
list[Intervention]

The interventions applied (empty when no board has arrived yet or nothing is stalled).

run() async

Connect, wait for the handshake, and supervise until the link ends.

sanitize_text(text, max_len=400)

Collapse runs of whitespace and truncate to max_len characters.

Parameters:

Name Type Description Default
text str

Raw text to clean. Non-string input is coerced via str.

required
max_len int

Maximum length of the returned string. Defaults to 400.

400

Returns:

Type Description
str

Single-spaced, length-bounded text.

would_create_cycle(waits, waiter, holder)

Return whether waiter waiting for holder would close a cycle.

Parameters:

Name Type Description Default
waits dict[str, str]

The current wait-for graph mapping each waiting agent to the agent it waits for.

required
waiter str

The agent that wants to start waiting.

required
holder str

The agent currently holding what waiter wants.

required

Returns:

Type Description
bool

True if adding the edge waiter -> holder would create a cycle (including the degenerate self-wait waiter == holder); False when the wait is safe to register.

plan_team(port, *, no_workers=False, fast_model=None, reason_model=None, prefix='', detect=detect_model)

Plan the child processes for a team without spawning anything.

Parameters:

Name Type Description Default
port int

Hub port; the worker URI is derived from it.

required
no_workers bool

When True only the hub is planned. Defaults to False.

False
fast_model str or None

Explicit model overrides; when None they are auto-detected.

None
reason_model str or None

Explicit model overrides; when None they are auto-detected.

None
prefix str

Namespace prepended to every worker name, so a team can run per project without clashing with another project's roster. Defaults to "".

''
detect ModelDetector

Model-detection callable, injectable for testing.

detect_model

Returns:

Type Description
list[ProcessSpec]

The hub spec followed by zero, one, or two worker specs. A second worker is added only when the reasoning model differs from the fast one.

run_team(port=8876, *, no_workers=False, fast_model=None, reason_model=None, prefix='', popen=subprocess.Popen, sleep=time.sleep, detect=detect_model)

Spawn a hub and workers, then monitor them until one exits.

Parameters:

Name Type Description Default
port int

Hub port. Defaults to 8876.

8876
no_workers bool

When True only the hub is started. Defaults to False.

False
fast_model str or None

Explicit model overrides; auto-detected when None.

None
reason_model str or None

Explicit model overrides; auto-detected when None.

None
prefix str

Namespace prepended to every worker name. Defaults to "".

''
popen Callable

subprocess.Popen-compatible spawner, injectable for testing.

Popen
sleep Callable

time.sleep-compatible delay, injectable for testing.

sleep
detect ModelDetector

Model-detection callable, injectable for testing.

detect_model

Returns:

Type Description
int

0 on a clean Ctrl+C shutdown, otherwise the exit code of the first child that terminated (or 1 when it exited without a code).

can_transition(current, target)

Return whether a task may move from current to target.

A move to an unknown status is never allowed. Re-affirming the same status is always allowed (it lets an owner update other fields without changing state). Otherwise the move must appear in the transition table.

Parameters:

Name Type Description Default
current str

The task's present status.

required
target str

The requested next status.

required

Returns:

Type Description
bool

True if the transition is legal.

is_service_message(sender, payload, msg_type='chat')

Return whether a message is system/sidecar noise the worker should skip.

Parameters:

Name Type Description Default
sender str

Name of the message sender.

required
payload str

Message text.

required
msg_type str

Message type. Defaults to "chat".

'chat'

Returns:

Type Description
bool

True for hub messages, *_LITE/*_CORE sidecars, system snapshot/notification types, and [ACK]/[ROUTE]/[MAIN] relay markers; False otherwise.

addresses_project(target, project)

Return whether a message to target reaches any agent in project.

Matches a broadcast, the project name itself, and any <project>/... identity or group glob — so a returning terminal catches up everything for its repo regardless of which instance id it now runs as.

Parameters:

Name Type Description Default
target str

The recipient field of a message.

required
project str

The project (repo) name, e.g. "quantum".

required

Returns:

Type Description
bool

True for a broadcast, target == project, or any project/... part.

build_envelope(sender, msg_type, *, target='all', payload='', now=None, **extra)

Build the agent-side message envelope sent to the hub.

Parameters:

Name Type Description Default
sender str

Name of the sending agent.

required
msg_type str

One of the :class:MessageType constants.

required
target str

Recipient agent name, or "all" for a broadcast. Defaults to "all".

'all'
payload str

Free-form text body of the message.

''
now float or None

Override timestamp, in seconds. None uses the system clock.

None
**extra Any

Additional protocol fields (e.g. task_id, limit) merged into the envelope after the base fields.

{}

Returns:

Type Description
dict[str, Any]

A JSON-serialisable envelope ready to hand to json.dumps.

is_directed(target, name)

Return whether target names name specifically rather than broadcasting.

Like :func:is_recipient but "all" (and an empty target) is not a match, so a reader can wake only on messages addressed to it or a group it is in and treat broadcasts as read-when-convenient.

Parameters:

Name Type Description Default
target str

The recipient field.

required
name str

The reader's own agent name.

required

Returns:

Type Description
bool

True only when target is a non-broadcast pattern that matches name.

is_recipient(target, name)

Return whether name is an addressee of a message sent to target.

The hub broadcasts every chat to every connected client and carries the intended recipient in target; a reader uses this predicate to keep only the messages meant for it.

Parameters:

Name Type Description Default
target str

The recipient field: the broadcast keyword "all" (or empty), a single name, a comma-separated list, or a glob such as "quantum/*" (every agent in the quantum project) or "quantum/claude-*".

required
name str

The reader's own agent name, e.g. "quantum/claude-7f3a".

required

Returns:

Type Description
bool

True for a broadcast or when name matches one of the target parts (each part is matched as a case-sensitive glob, so a plain name is exact).

system_message(payload, *, hub_id, msg_type=MessageType.SYSTEM, target='all', now=None, **extra)

Build a hub-originated system message.

Parameters:

Name Type Description Default
payload str

Human-readable body of the system message.

required
hub_id str

Identifier of the emitting hub, stamped into the envelope.

required
msg_type str

One of the hub-side :class:MessageType constants. Defaults to :attr:MessageType.SYSTEM.

SYSTEM
target str

Recipient agent name, or "all" for a broadcast. Defaults to "all".

'all'
now float or None

Override timestamp, in seconds. None uses the system clock.

None
**extra Any

Additional fields (e.g. task_id, online_agents, snapshot) merged into the envelope after the base fields.

{}

Returns:

Type Description
dict[str, Any]

A JSON-serialisable envelope with sender set to :data:SENDER_HUB.

wakes(target, name, *, directed_only, sender='', priority=False)

Return whether a chat to target should wake a waiter listening for name.

In the default mode any recipient match wakes (:func:is_recipient). In directed-only mode only a directed match wakes (:func:is_directed) — except a priority-flagged message and a message from a :data:PRIORITY_SENDERS sender always wake. So an "all" broadcast that genuinely matters (a CEO directive, a flagged announcement) still reaches a quiet waiter promptly, while routine peer broadcasts stay suppressed. Directed-only means "no routine broadcast wakes me", not "no broadcast ever wakes me".

Parameters:

Name Type Description Default
target str

The recipient field of the message.

required
name str

The waiter's own identity.

required
directed_only bool

When True, suppress routine broadcasts (wake only on a directed, priority, or priority-sender message).

required
sender str

The message's sender, matched against :data:PRIORITY_SENDERS.

''
priority bool

Whether the message carries an explicit priority flag.

False

Returns:

Type Description
bool

Whether the waiter should wake on this message.

decode_lite(lite)

Reconstruct a full message envelope from a compact relay event.

The inverse of :func:encode_lite. Because the lite format stores the timestamp as a millisecond integer, the reconstructed timestamp is precise only to the millisecond — sub-millisecond detail from the original envelope is not recoverable. Missing or malformed short keys fall back to the same defaults :func:encode_lite itself emits.

Parameters:

Name Type Description Default
lite dict[str, Any]

A short-key envelope as produced by :func:encode_lite.

required

Returns:

Type Description
dict[str, Any]

A full envelope with sender, target, type, payload, timestamp (seconds), msg_id, and hub_id.

encode_lite(message)

Pack a full Synapse message into a short-key relay envelope.

The encode half of the relay codec; :func:decode_lite is its inverse.

Parameters:

Name Type Description Default
message dict[str, Any]

A full message envelope as produced by :func:synapse_channel.protocol.build_envelope.

required

Returns:

Type Description
dict[str, Any]

A mapping with single/double-letter keys: v (version), i (msg id), ty (type), s (sender), to (target), p (payload), t (millisecond timestamp), h (hub id). Malformed timestamp/msg_id fields fall back to the current time and 0.

classify(prompt, *, rule_max_chars=DEFAULT_RULE_MAX_CHARS, heavy_min_chars=DEFAULT_HEAVY_MIN_CHARS)

Classify a prompt into a routing task class.

The policy is deterministic: a short prompt is rule; a long prompt or one containing a heavy keyword is heavy; everything else is slm.

Parameters:

Name Type Description Default
prompt str

The user prompt to classify; leading/trailing whitespace is ignored.

required
rule_max_chars int

Length at or below which a prompt is rule.

DEFAULT_RULE_MAX_CHARS
heavy_min_chars int

Length at or above which a prompt is heavy.

DEFAULT_HEAVY_MIN_CHARS

Returns:

Type Description
str

One of :attr:TaskClass.RULE, :attr:TaskClass.SLM, or :attr:TaskClass.HEAVY.

paths_overlap(a, b)

Return whether two declared paths cover any common file.

Two paths overlap when, after normalisation, they are equal or one is an ancestor directory of the other. The empty (root) path covers the whole tree and therefore overlaps everything.

Parameters:

Name Type Description Default
a str

Declared file or directory paths.

required
b str

Declared file or directory paths.

required

Returns:

Type Description
bool

True if the paths share at least one file.

scopes_conflict(worktree_a, paths_a, worktree_b, paths_b)

Return whether two claim scopes contend for the same files.

Scopes in different worktrees never conflict. Within the same worktree, an empty path set means the claim owns the whole tree (conflicts with any other claim there); otherwise the scopes conflict when any declared path of one overlaps any declared path of the other.

Parameters:

Name Type Description Default
worktree_a str

Worktree labels of the two claims.

required
worktree_b str

Worktree labels of the two claims.

required
paths_a Sequence[str]

Declared paths of the two claims (empty means the whole worktree).

required
paths_b Sequence[str]

Declared paths of the two claims (empty means the whole worktree).

required

Returns:

Type Description
bool

True if the two scopes contend for at least one file.

detect_stalls(board, *, now, idle_seconds=DEFAULT_IDLE_SECONDS)

Decide which tasks on a board snapshot are stalled and should be re-offered.

Parameters:

Name Type Description Default
board dict[str, Any]

A blackboard snapshot as returned by :meth:~synapse_channel.ledger.Blackboard.snapshot.

required
now float

Current wall-clock time, in seconds, used to age in-progress tasks.

required
idle_seconds float

No-activity window after which an in_progress task is stalled.

DEFAULT_IDLE_SECONDS

Returns:

Type Description
list[Intervention]

One re-offer per stalled task, sorted by task_id.