Streaming Halt¶
Overview¶
StreamingKernel monitors coherence by re-scoring the accumulated text
after each token (or every N-th token with score_every_n). Scoring is
sentence-level re-evaluation of the full accumulated output, not
independent per-token analysis. When coherence degrades, generation is
halted. Three independent halt mechanisms:
- Hard limit — any single token below threshold
- Sliding window — rolling average drops below window threshold
- Downward trend — coherence drops by more than a delta over N tokens
Architecture¶
flowchart TD
LLM["LLM Token Stream"] --> SK["StreamingKernel"]
SK --> ACC["Accumulate token into partial response"]
ACC --> CB["coherence_callback(partial_text) → score"]
CB --> CHECK{"Check 3 halt conditions"}
CHECK --> H1{"score < hard_limit?"}
H1 -->|Yes| HALT["HALT<br/>(emergency stop)"]
H1 -->|No| H2{"window avg < window_threshold?"}
H2 -->|Yes| HALT
H2 -->|No| H3{"trend drop > trend_threshold?"}
H3 -->|Yes| HALT
H3 -->|No| NEXT["Accept token → next"]
NEXT --> LLM
HALT --> SOFT{"halt_mode?"}
SOFT -->|"hard"| STOP["Immediate stop"]
SOFT -->|"soft"| FINISH["Finish sentence<br/>(50-token cap)"]
STOP --> SESSION["StreamSession<br/>(halted=True, halt_index, scores[])"]
FINISH --> SESSION
NEXT -->|"all tokens consumed"| DONE["StreamSession<br/>(halted=False, scores[])"]
style HALT fill:#c62828,color:#fff
style NEXT fill:#2e7d32,color:#fff
style STOP fill:#c62828,color:#fff
style FINISH fill:#ff8f00,color:#fff
style DONE fill:#2e7d32,color:#fff
The caller supplies the coherence_callback — typically a closure that accumulates tokens, calls scorer.review(), and returns the score. The kernel only decides when to halt; the caller decides how to score.
How NLI Premise/Hypothesis Are Constructed¶
When use_nli=True on the scorer:
- Premise = the prompt text, or the top-k retrieval results from
GroundTruthStore/VectorGroundTruthStoreif a KB is configured. - Hypothesis = the accumulated response text so far (all tokens joined).
- The NLI model (DeBERTa) scores the entailment probability P(premise entails hypothesis).
- The divergence score is
1.0 - entailment_probability— higher means more hallucination risk.
With chunked NLI (scorer.score_chunked()):
- Both premise and hypothesis are split into sentence-level chunks.
- All (premise_chunk, hypothesis_chunk) pairs are scored.
- The final score uses max aggregation across pairs — the worst contradiction in any pair drives the score.
- This catches localized hallucinations that would be diluted in a full-text comparison.
Basic Usage¶
from director_ai import StreamingKernel
kernel = StreamingKernel(
hard_limit=0.4,
window_size=10,
window_threshold=0.55,
trend_window=5,
trend_threshold=0.15,
soft_limit=0.6,
)
session = kernel.stream_tokens(token_generator, coherence_callback)
if session.halted:
print(f"Halted: {session.halt_reason}")
print(f"Safe output: {session.output}")
else:
print(f"Approved: {session.output}")
on_halt Callback¶
def handle_halt(session):
print(f"Halted at token {session.halt_index}")
# Log, alert, switch to fallback
kernel = StreamingKernel(on_halt=handle_halt)
Async Streaming¶
from director_ai import AsyncStreamingKernel
kernel = AsyncStreamingKernel(
hard_limit=0.4,
on_halt=handle_halt,
soft_limit=0.6,
)
session = await kernel.stream_to_session(async_token_gen, coherence_fn)
Pre-Sampling Inference Server Hooks¶
InferenceServerHook is the adapter boundary for vLLM, TGI, and
llama.cpp deployments that want a decision before a candidate token is
accepted. The hook is stdlib-only; server packages call check() from
their own logits or token-filter callback and then apply the returned
payload.
from director_ai import InferenceHookRequest, build_inference_server_hook
def coherence_fn(text: str) -> float:
return scorer.review(prompt, text).score
hook = build_inference_server_hook(
"vllm",
coherence_fn,
hard_limit=0.4,
)
decision = hook.check(
InferenceHookRequest(
server="vllm",
accumulated_text="The measured value is ",
candidate_token="grounded",
token_id=42,
request_id="req-123",
),
logits=[0.0] * 100,
)
if not decision.allow:
emit(decision.safety_event.to_dict())
apply_server_payload(decision.server_payload)
On a halt, the hook returns masked logits when logits are supplied, the
server-specific action payload, blocked token ids when known, and a
tenant-safe SafetyEvent with hook_scope="inference_server".
Threshold Tuning by Domain¶
| Domain | hard_limit | window_threshold | trend_threshold | window_size | Rationale |
|---|---|---|---|---|---|
| General | 0.4 | 0.50 | 0.15 | 10 | Balanced — catches obvious hallucinations without over-halting |
| Medical | 0.5 | 0.60 | 0.10 | 8 | Strict — medical misinformation is high-risk |
| Finance | 0.5 | 0.55 | 0.12 | 8 | Strict — numerical claims must be grounded |
| Legal | 0.45 | 0.55 | 0.12 | 10 | Moderate-strict — citations and precedent matter |
| Creative | 0.3 | 0.40 | 0.20 | 15 | Loose — creative writing tolerates divergence |
See the Threshold Tuning Guide for detailed tuning methodology.
Debug Mode¶
Enable streaming_debug=True to get per-token diagnostic snapshots:
kernel = StreamingKernel(
hard_limit=0.4,
window_size=10,
streaming_debug=True,
)
session = kernel.stream_tokens(token_gen, coherence_cb)
for snap in session.debug_log:
print(
f"token {snap['index']}: "
f"coherence={snap['coherence']:.3f} "
f"window_avg={snap['window_avg']:.3f} "
f"trend_drop={snap['trend_drop']:.3f} "
f"accumulated={snap['accumulated_tokens']}"
)
Each TokenEvent also gets a debug_info dict with the same fields. Use this to diagnose why a halt triggered or to tune thresholds on your data.
Fields in each debug snapshot:
| Field | Type | Description |
|---|---|---|
index |
int | Token position in stream |
coherence |
float | Raw coherence score for this token |
window_avg |
float | Current sliding window average |
trend_drop |
float | Coherence delta over trend window |
accumulated_tokens |
int | Total tokens processed so far |
False-Halt Rate¶
Measured with benchmarks/streaming_false_halt_bench.py on 135 factually correct passages (heuristic mode, use_nli=False):
| Metric | Value |
|---|---|
| Passages tested | 135 |
| False halts | 6 |
| False-halt rate | 4.4% |
| Avg coherence | 0.459 |
All 6 false halts are trend-triggered on borderline score trajectories (trend drop barely exceeding 0.30 threshold). NLI mode produces higher coherence scores and is expected to have a lower false-halt rate.
The regression suite (benchmarks/regression_suite.py) asserts
false_halt_rate < 5% on every CI run.
To reproduce:
With NLI enabled (requires pip install director-ai[nli]):
Structured Halt Evidence¶
When a scorer is passed to stream_tokens(), halt events include a HaltEvidence object with the top-K contradicting chunks and NLI scores:
session = kernel.stream_tokens(
token_generator,
coherence_callback,
scorer=scorer,
top_k=3,
)
if session.halt_evidence_structured:
ev = session.halt_evidence_structured
print(f"Reason: {ev.reason}")
print(f"Score: {ev.last_score:.3f}")
print(f"Action: {ev.suggested_action}")
if ev.trace_attribution:
trace = ev.trace_attribution
print(f"Token: {trace.token_offset}")
print(f"Path: {trace.scorer_path}")
if ev.counterfactual_diagnostic and ev.counterfactual_diagnostic.best_change:
change = ev.counterfactual_diagnostic.best_change
print(f"Fact source: {change.fact_source}")
print(f"Required delta: {change.required_score_delta:.3f}")
for chunk in ev.evidence_chunks:
print(f" - {chunk.text[:80]} (distance={chunk.distance:.3f})")
Each TokenEvent on halt also carries a halt_evidence field with the same HaltEvidence object. Its trace_attribution field records the fact source, retrieval path, scorer path, token offset, threshold, and halt margin. Its counterfactual_diagnostic field answers which single retrieved fact branch would have prevented the halt.
When OpenTelemetry is enabled, the same halt fields are exported as
stream.halt_cause.* and stream.counterfactual.* attributes on
streaming spans. Raw token text and raw fact text are not written to
spans.
Trace Explorer¶
The [ui] extra includes a Trace Explorer tab in the Gradio wizard.
Paste JSON from a StreamSession, agent trace, swarm trace, or halt
evidence payload to render a timeline with scope, event type, halt
state, score, hook, reason, attribution, and counterfactual detail.
import json
from director_ai.ui.config_wizard import build_trace_explorer
summary, rows, detail = build_trace_explorer(
json.dumps(
{
"halted": True,
"halt_reason": "hard_limit",
"events": [
{"index": 0, "token": "The", "coherence": 0.92},
{
"index": 1,
"token": " claim",
"coherence": 0.31,
"halted": True,
"halt_reason": "hard_limit",
},
],
},
),
)
The pure helper returns Markdown summary text, table rows for the UI, and a JSON detail object. This keeps trace rendering testable without launching Gradio.
StreamSession Properties¶
| Property | Type | Description |
|---|---|---|
output |
str | Safe partial output (up to halt point) |
halted |
bool | Whether generation was stopped |
halt_index |
int | Token index where halt occurred |
halt_reason |
str | Which mechanism triggered |
avg_coherence |
float | Mean coherence across all tokens |
min_coherence |
float | Lowest coherence observed |
warning_count |
int | Tokens in soft warning zone |
duration_ms |
float | Total processing time |
debug_log |
list[dict] | Debug snapshots (only when streaming_debug=True) |
Limitations¶
- Accumulated-text scoring, not per-token: Each coherence score reflects the full accumulated output re-evaluated against the premise. Individual tokens don't have independent scores.
- No retraction: Halting stops future tokens from being generated, but tokens already delivered to the client cannot be retracted. Applications that need full rollback should buffer output until a sentence boundary passes scoring.
- Scoring granularity depends on the NLI backend: Heuristic mode uses
word overlap (~0.1 ms/call), NLI mode runs the full DeBERTa pipeline
(~15-50 ms/call). Use
score_every_n > 1oradaptive=Truefor production latency budgets. - Hosted API constraints: When using OpenAI/Anthropic providers, the scorer re-runs the full NLI pipeline on the accumulated text. There is no server-side per-token scoring.