StreamingKernel¶
Token-by-token streaming oversight. Monitors coherence on every token (or every N-th token) and halts generation when coherence degrades. Three independent halt mechanisms operate simultaneously.
Usage¶
from director_ai import StreamingKernel
kernel = StreamingKernel(
hard_limit=0.4,
window_size=10,
window_threshold=0.55,
trend_window=5,
trend_threshold=0.15,
)
session = kernel.stream_tokens(token_generator, coherence_callback)
if session.halted:
print(f"Halted at token {session.halt_index}: {session.halt_reason}")
print(f"Safe output: {session.output}")
else:
print(f"Approved: {session.output}")
Halt Mechanisms¶
| Mechanism | Trigger | Parameter |
|---|---|---|
| Hard limit | Any single score < threshold | hard_limit |
| Sliding window | Rolling average drops below threshold | window_size, window_threshold |
| Downward trend | Coherence drops by > delta over N tokens | trend_window, trend_threshold |
graph LR
T[Token Stream] --> K[StreamingKernel]
K --> H{Score Check}
H -->|Hard limit| HALT[HALT]
H -->|Window avg| HALT
H -->|Trend drop| HALT
H -->|Pass| O[Output Token]
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
hard_limit |
float |
0.5 |
Absolute coherence floor |
window_size |
int |
10 |
Sliding window width |
window_threshold |
float |
0.55 |
Window average threshold |
trend_window |
int |
5 |
Trend detection window |
trend_threshold |
float |
0.15 |
Max allowed coherence drop |
soft_limit |
float |
0.6 |
Warning zone upper bound (scores between threshold and soft_limit emit warnings) |
on_halt |
callable \| None |
None |
Callback invoked on halt |
halt_mode |
str |
"hard" |
"hard" = stop, "soft" = warn + continue |
score_every_n |
int |
1 |
Score every N-th token (latency tradeoff) |
streaming_debug |
bool |
False |
Emit per-token diagnostic snapshots |
Methods¶
stream_tokens()¶
session = kernel.stream_tokens(
token_generator, # Iterable[str] — token source
coherence_callback, # Callable[[str], float] — returns coherence
scorer=None, # Optional CoherenceScorer for structured evidence
top_k=3, # Evidence chunks to include on halt
) -> StreamSession
on_halt Callback¶
def handle_halt(session):
print(f"Halted at token {session.halt_index}")
send_alert(session.halt_reason)
kernel = StreamingKernel(on_halt=handle_halt, hard_limit=0.4)
Async Streaming¶
from director_ai import AsyncStreamingKernel
kernel = AsyncStreamingKernel(hard_limit=0.4, soft_limit=0.6)
session = await kernel.stream_to_session(async_token_gen, coherence_fn)
StreamSession¶
Tracks the complete state of a streaming oversight session.
| Property | Type | Description |
|---|---|---|
output |
str |
Safe partial output (up to halt point) |
halted |
bool |
Whether generation was stopped |
soft_halted |
bool |
Whether soft halt was triggered |
halt_index |
int |
Token index where halt occurred (-1 if none) |
halt_reason |
str |
Which mechanism triggered halt |
halt_evidence_structured |
HaltEvidence \| None |
Structured evidence with chunks |
token_count |
int |
Total tokens processed |
warning_count |
int |
Tokens in soft warning zone |
events |
list[TokenEvent] |
Full event trace |
debug_log |
list[dict] |
Debug snapshots (when streaming_debug=True) |
TokenEvent¶
A single token event in the stream.
| Field | Type | Description |
|---|---|---|
token |
str |
The token text |
index |
int |
Position in stream |
coherence |
float |
Coherence score at this position |
timestamp |
float |
Unix timestamp |
halted |
bool |
Whether this token triggered halt |
warning |
bool |
Whether this token is in warning zone |
halt_evidence |
HaltEvidence \| None |
Evidence if halt triggered |
Full API¶
director_ai.core.runtime.streaming.StreamingKernel
¶
StreamingKernel(hard_limit: float = 0.5, window_size: int = 10, window_threshold: float = 0.55, trend_window: int = 5, trend_threshold: float = 0.15, on_halt=None, soft_limit: float = 0.6, streaming_debug: bool = False, halt_mode: str = 'hard', score_every_n: int = 1, adaptive: bool = False, max_cadence: int = 8)
Bases: HaltMonitor
Streaming token-by-token safety kernel with sliding window oversight.
Extends HaltMonitor with token-level monitoring and a sliding
window coherence check that can catch gradual degradation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hard_limit
|
float — absolute coherence floor (halt if below).
|
|
0.5
|
window_size
|
int — number of tokens in sliding coherence window.
|
|
10
|
window_threshold
|
float — halt if sliding window average drops below this.
|
|
0.55
|
trend_window
|
int — tokens to check for downward trend.
|
|
5
|
trend_threshold
|
float — halt if coherence drops this much over trend window.
|
|
0.15
|
check_halt
¶
Evaluate halt conditions for a single score update.
Maintains internal sliding window and trend history. Returns True if any halt condition is met.
reset_state
¶
Clear internal window/trend state and re-arm kernel for a new stream.
stream_tokens
¶
stream_tokens(token_generator, coherence_callback, evidence_callback=None, scorer: CoherenceScorer | None = None, top_k: int = 3, prompt: str = '') -> StreamSession
Process tokens one by one with sliding window oversight.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_generator
|
iterable of str — token source.
|
|
required |
coherence_callback
|
callable(str) -> float — receives the
|
accumulated output so far (not the individual token) and
returns a coherence score in [0, 1]. Called every
|
required |
evidence_callback
|
callable(str) -> str | None — optional, returns
|
human-readable evidence snippet explaining the coherence score. Called only on halt events to avoid overhead on every token. |
None
|
scorer
|
CoherenceScorer | None — when provided, halt events
|
include structured HaltEvidence with top-K contradicting chunks. |
None
|
top_k
|
int — number of evidence chunks to include (default 3).
|
|
3
|
prompt
|
str — original user prompt, passed to scorer.review() for
|
KB/RAG context in halt evidence. |
''
|
Returns:
| Type | Description |
|---|---|
StreamSession with full oversight trace.
|
|
stream_output
¶
Backward-compatible: returns string output or interrupt message.
director_ai.core.runtime.streaming.StreamSession
dataclass
¶
StreamSession(tokens: list[str] = list(), events: list[TokenEvent] = list(), coherence_history: list[float] = list(), halted: bool = False, soft_halted: bool = False, halt_index: int = -1, halt_reason: str = '', halt_evidence: str | None = None, halt_evidence_structured: HaltEvidence | None = None, start_time: float = 0.0, end_time: float = 0.0, warning_count: int = 0, debug_log: list[dict] = list())
Tracks state of a streaming oversight session.
director_ai.core.runtime.streaming.TokenEvent
dataclass
¶
TokenEvent(token: str, index: int, coherence: float, timestamp: float, halted: bool = False, warning: bool = False, evidence: str | None = None, halt_evidence: HaltEvidence | None = None, debug_info: dict | None = None)
A single token event in the stream.
director_ai.core.runtime.async_streaming.AsyncStreamingKernel
¶
AsyncStreamingKernel(hard_limit: float = 0.5, window_size: int = 10, window_threshold: float = 0.55, trend_window: int = 5, trend_threshold: float = 0.15, on_halt=None, soft_limit: float = 0.6, token_timeout: float = 0.0, total_timeout: float = 0.0, halt_mode: str = 'hard', score_every_n: int = 1, adaptive: bool = False, max_cadence: int = 8)
Bases: HaltMonitor
Async streaming token-by-token safety kernel for WebSocket use.
Mirrors StreamingKernel but uses async for / await.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hard_limit
|
float — absolute coherence floor (halt if below).
|
|
0.5
|
window_size
|
int — number of tokens in sliding coherence window.
|
|
10
|
window_threshold
|
float — halt if sliding window average drops below.
|
|
0.55
|
trend_window
|
int — tokens to check for downward trend.
|
|
5
|
trend_threshold
|
float — halt if coherence drops this much.
|
|
0.15
|
stream_tokens
async
¶
Async generator yielding TokenEvents with oversight checks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_source
|
async iterable of str — token source.
|
|
required |
coherence_callback
|
(str) -> float OR async (str) -> Awaitable[float].
|
Receives the accumulated output so far (not the individual
token). Called every |
required |
Yields:
| Type | Description |
|---|---|
TokenEvent for each token, with ``halted=True`` on the final if halted.
|
|
stream_to_session
async
¶
Collect all events into a StreamSession (convenience wrapper).