Skip to content

StreamingKernel

Token-by-token streaming oversight. Monitors coherence on every token (or every N-th token) and halts generation when coherence degrades. Three independent halt mechanisms operate simultaneously.

Usage

from director_ai import StreamingKernel

kernel = StreamingKernel(
    hard_limit=0.4,
    window_size=10,
    window_threshold=0.55,
    trend_window=5,
    trend_threshold=0.15,
)

session = kernel.stream_tokens(token_generator, coherence_callback)

if session.halted:
    print(f"Halted at token {session.halt_index}: {session.halt_reason}")
    print(f"Safe output: {session.output}")
else:
    print(f"Approved: {session.output}")

Halt Mechanisms

Mechanism Trigger Parameter
Hard limit Any single score < threshold hard_limit
Sliding window Rolling average drops below threshold window_size, window_threshold
Downward trend Coherence drops by > delta over N tokens trend_window, trend_threshold
graph LR
    T[Token Stream] --> K[StreamingKernel]
    K --> H{Score Check}
    H -->|Hard limit| HALT[HALT]
    H -->|Window avg| HALT
    H -->|Trend drop| HALT
    H -->|Pass| O[Output Token]

Constructor Parameters

Parameter Type Default Description
hard_limit float 0.5 Absolute coherence floor
window_size int 10 Sliding window width
window_threshold float 0.55 Window average threshold
trend_window int 5 Trend detection window
trend_threshold float 0.15 Max allowed coherence drop
soft_limit float 0.6 Warning zone upper bound (scores between threshold and soft_limit emit warnings)
on_halt callable \| None None Callback invoked on halt
halt_mode str "hard" "hard" = stop, "soft" = warn + continue
score_every_n int 1 Score every N-th token (latency tradeoff)
streaming_debug bool False Emit per-token diagnostic snapshots

Methods

stream_tokens()

session = kernel.stream_tokens(
    token_generator,        # Iterable[str] — token source
    coherence_callback,     # Callable[[str], float] — returns coherence
    scorer=None,            # Optional CoherenceScorer for structured evidence
    top_k=3,                # Evidence chunks to include on halt
) -> StreamSession

on_halt Callback

def handle_halt(session):
    print(f"Halted at token {session.halt_index}")
    send_alert(session.halt_reason)

kernel = StreamingKernel(on_halt=handle_halt, hard_limit=0.4)

Async Streaming

from director_ai import AsyncStreamingKernel

kernel = AsyncStreamingKernel(hard_limit=0.4, soft_limit=0.6)
session = await kernel.stream_to_session(async_token_gen, coherence_fn)

StreamSession

Tracks the complete state of a streaming oversight session.

Property Type Description
output str Safe partial output (up to halt point)
halted bool Whether generation was stopped
soft_halted bool Whether soft halt was triggered
halt_index int Token index where halt occurred (-1 if none)
halt_reason str Which mechanism triggered halt
halt_evidence_structured HaltEvidence \| None Structured evidence with chunks
token_count int Total tokens processed
warning_count int Tokens in soft warning zone
events list[TokenEvent] Full event trace
debug_log list[dict] Debug snapshots (when streaming_debug=True)

TokenEvent

A single token event in the stream.

Field Type Description
token str The token text
index int Position in stream
coherence float Coherence score at this position
timestamp float Unix timestamp
halted bool Whether this token triggered halt
warning bool Whether this token is in warning zone
halt_evidence HaltEvidence \| None Evidence if halt triggered

Full API

director_ai.core.runtime.streaming.StreamingKernel

StreamingKernel(hard_limit: float = 0.5, window_size: int = 10, window_threshold: float = 0.55, trend_window: int = 5, trend_threshold: float = 0.15, on_halt=None, soft_limit: float = 0.6, streaming_debug: bool = False, halt_mode: str = 'hard', score_every_n: int = 1, adaptive: bool = False, max_cadence: int = 8)

Bases: HaltMonitor

Streaming token-by-token safety kernel with sliding window oversight.

Extends HaltMonitor with token-level monitoring and a sliding window coherence check that can catch gradual degradation.

Parameters:

Name Type Description Default
hard_limit float — absolute coherence floor (halt if below).
0.5
window_size int — number of tokens in sliding coherence window.
10
window_threshold float — halt if sliding window average drops below this.
0.55
trend_window int — tokens to check for downward trend.
5
trend_threshold float — halt if coherence drops this much over trend window.
0.15

check_halt

check_halt(score: float) -> bool

Evaluate halt conditions for a single score update.

Maintains internal sliding window and trend history. Returns True if any halt condition is met.

reset_state

reset_state() -> None

Clear internal window/trend state and re-arm kernel for a new stream.

stream_tokens

stream_tokens(token_generator, coherence_callback, evidence_callback=None, scorer: CoherenceScorer | None = None, top_k: int = 3, prompt: str = '') -> StreamSession

Process tokens one by one with sliding window oversight.

Parameters:

Name Type Description Default
token_generator iterable of str — token source.
required
coherence_callback callable(str) -> float — receives the

accumulated output so far (not the individual token) and returns a coherence score in [0, 1]. Called every score_every_n tokens; cadence adapts when adaptive=True.

required
evidence_callback callable(str) -> str | None — optional, returns

human-readable evidence snippet explaining the coherence score. Called only on halt events to avoid overhead on every token.

None
scorer CoherenceScorer | None — when provided, halt events

include structured HaltEvidence with top-K contradicting chunks.

None
top_k int — number of evidence chunks to include (default 3).
3
prompt str — original user prompt, passed to scorer.review() for

KB/RAG context in halt evidence.

''

Returns:

Type Description
StreamSession with full oversight trace.

stream_output

stream_output(token_generator, coherence_callback) -> str

Backward-compatible: returns string output or interrupt message.

director_ai.core.runtime.streaming.StreamSession dataclass

StreamSession(tokens: list[str] = list(), events: list[TokenEvent] = list(), coherence_history: list[float] = list(), halted: bool = False, soft_halted: bool = False, halt_index: int = -1, halt_reason: str = '', halt_evidence: str | None = None, halt_evidence_structured: HaltEvidence | None = None, start_time: float = 0.0, end_time: float = 0.0, warning_count: int = 0, debug_log: list[dict] = list())

Tracks state of a streaming oversight session.

director_ai.core.runtime.streaming.TokenEvent dataclass

TokenEvent(token: str, index: int, coherence: float, timestamp: float, halted: bool = False, warning: bool = False, evidence: str | None = None, halt_evidence: HaltEvidence | None = None, debug_info: dict | None = None)

A single token event in the stream.

director_ai.core.runtime.async_streaming.AsyncStreamingKernel

AsyncStreamingKernel(hard_limit: float = 0.5, window_size: int = 10, window_threshold: float = 0.55, trend_window: int = 5, trend_threshold: float = 0.15, on_halt=None, soft_limit: float = 0.6, token_timeout: float = 0.0, total_timeout: float = 0.0, halt_mode: str = 'hard', score_every_n: int = 1, adaptive: bool = False, max_cadence: int = 8)

Bases: HaltMonitor

Async streaming token-by-token safety kernel for WebSocket use.

Mirrors StreamingKernel but uses async for / await.

Parameters:

Name Type Description Default
hard_limit float — absolute coherence floor (halt if below).
0.5
window_size int — number of tokens in sliding coherence window.
10
window_threshold float — halt if sliding window average drops below.
0.55
trend_window int — tokens to check for downward trend.
5
trend_threshold float — halt if coherence drops this much.
0.15

stream_tokens async

stream_tokens(token_source, coherence_callback: CoherenceCallback) -> AsyncIterator[TokenEvent]

Async generator yielding TokenEvents with oversight checks.

Parameters:

Name Type Description Default
token_source async iterable of str — token source.
required
coherence_callback (str) -> float OR async (str) -> Awaitable[float].

Receives the accumulated output so far (not the individual token). Called every score_every_n tokens.

required

Yields:

Type Description
TokenEvent for each token, with ``halted=True`` on the final if halted.

stream_to_session async

stream_to_session(token_source, coherence_callback: CoherenceCallback) -> StreamSession

Collect all events into a StreamSession (convenience wrapper).