Streaming Halt¶
Overview¶
StreamingKernel monitors coherence by re-scoring the accumulated text
after each token (or every N-th token with score_every_n). Scoring is
sentence-level re-evaluation of the full accumulated output, not
independent per-token analysis. When coherence degrades, generation is
halted. Three independent halt mechanisms:
- Hard limit — any single token below threshold
- Sliding window — rolling average drops below window threshold
- Downward trend — coherence drops by more than a delta over N tokens
Architecture¶
LLM token stream ──► StreamingKernel
│
├─ coherence_callback(token) → float
│ ├─ accumulate tokens into partial response
│ ├─ scorer.review(prompt, partial_response)
│ └─ return score.score
│
├─ 3 halt mechanisms:
│ ├─ hard_limit: score < threshold → emergency stop
│ ├─ window_avg: sliding window mean < threshold
│ └─ trend_drop: coherence[0] - coherence[-1] > delta
│
└─ StreamSession with full trace
The caller supplies the coherence_callback — typically a closure that accumulates tokens, calls scorer.review(), and returns the score. The kernel only decides when to halt; the caller decides how to score.
How NLI Premise/Hypothesis Are Constructed¶
When use_nli=True on the scorer:
- Premise = the prompt text, or the top-k retrieval results from
GroundTruthStore/VectorGroundTruthStoreif a KB is configured. - Hypothesis = the accumulated response text so far (all tokens joined).
- The NLI model (DeBERTa) scores the entailment probability P(premise entails hypothesis).
- The divergence score is
1.0 - entailment_probability— higher means more hallucination risk.
With chunked NLI (scorer.score_chunked()):
- Both premise and hypothesis are split into sentence-level chunks.
- All (premise_chunk, hypothesis_chunk) pairs are scored.
- The final score uses max aggregation across pairs — the worst contradiction in any pair drives the score.
- This catches localized hallucinations that would be diluted in a full-text comparison.
Basic Usage¶
from director_ai import StreamingKernel
kernel = StreamingKernel(
hard_limit=0.4,
window_size=10,
window_threshold=0.55,
trend_window=5,
trend_threshold=0.15,
soft_limit=0.6,
)
session = kernel.stream_tokens(token_generator, coherence_callback)
if session.halted:
print(f"Halted: {session.halt_reason}")
print(f"Safe output: {session.output}")
else:
print(f"Approved: {session.output}")
on_halt Callback¶
def handle_halt(session):
print(f"Halted at token {session.halt_index}")
# Log, alert, switch to fallback
kernel = StreamingKernel(on_halt=handle_halt)
Async Streaming¶
from director_ai import AsyncStreamingKernel
kernel = AsyncStreamingKernel(
hard_limit=0.4,
on_halt=handle_halt,
soft_limit=0.6,
)
session = await kernel.stream_to_session(async_token_gen, coherence_fn)
Threshold Tuning by Domain¶
| Domain | hard_limit | window_threshold | trend_threshold | window_size | Rationale |
|---|---|---|---|---|---|
| General | 0.4 | 0.50 | 0.15 | 10 | Balanced — catches obvious hallucinations without over-halting |
| Medical | 0.5 | 0.60 | 0.10 | 8 | Strict — medical misinformation is high-risk |
| Finance | 0.5 | 0.55 | 0.12 | 8 | Strict — numerical claims must be grounded |
| Legal | 0.45 | 0.55 | 0.12 | 10 | Moderate-strict — citations and precedent matter |
| Creative | 0.3 | 0.40 | 0.20 | 15 | Loose — creative writing tolerates divergence |
See the Threshold Tuning Guide for detailed tuning methodology.
Debug Mode¶
Enable streaming_debug=True to get per-token diagnostic snapshots:
kernel = StreamingKernel(
hard_limit=0.4,
window_size=10,
streaming_debug=True,
)
session = kernel.stream_tokens(token_gen, coherence_cb)
for snap in session.debug_log:
print(
f"token {snap['index']}: "
f"coherence={snap['coherence']:.3f} "
f"window_avg={snap['window_avg']:.3f} "
f"trend_drop={snap['trend_drop']:.3f} "
f"accumulated={snap['accumulated_tokens']}"
)
Each TokenEvent also gets a debug_info dict with the same fields. Use this to diagnose why a halt triggered or to tune thresholds on your data.
Fields in each debug snapshot:
| Field | Type | Description |
|---|---|---|
index |
int | Token position in stream |
coherence |
float | Raw coherence score for this token |
window_avg |
float | Current sliding window average |
trend_drop |
float | Coherence delta over trend window |
accumulated_tokens |
int | Total tokens processed so far |
False-Halt Rate¶
Measured with benchmarks/streaming_false_halt_bench.py on 138 factually correct passages (heuristic mode, use_nli=False):
| Metric | Value |
|---|---|
| Passages tested | 138 |
| False halts | 0 |
| False-halt rate | 0.0% |
| Avg coherence | 0.45+ |
The regression suite (benchmarks/regression_suite.py) asserts false_halt_rate == 0.0 on every CI run. If this assertion fails, the build breaks.
To reproduce:
With NLI enabled (requires pip install director-ai[nli]):
Structured Halt Evidence¶
When a scorer is passed to stream_tokens(), halt events include a HaltEvidence object with the top-K contradicting chunks and NLI scores:
session = kernel.stream_tokens(
token_generator,
coherence_callback,
scorer=scorer,
top_k=3,
)
if session.halt_evidence_structured:
ev = session.halt_evidence_structured
print(f"Reason: {ev.reason}")
print(f"Score: {ev.last_score:.3f}")
print(f"Action: {ev.suggested_action}")
for chunk in ev.evidence_chunks:
print(f" - {chunk.text[:80]} (distance={chunk.distance:.3f})")
Each TokenEvent on halt also carries a halt_evidence field with the same HaltEvidence object.
StreamSession Properties¶
| Property | Type | Description |
|---|---|---|
output |
str | Safe partial output (up to halt point) |
halted |
bool | Whether generation was stopped |
halt_index |
int | Token index where halt occurred |
halt_reason |
str | Which mechanism triggered |
avg_coherence |
float | Mean coherence across all tokens |
min_coherence |
float | Lowest coherence observed |
warning_count |
int | Tokens in soft warning zone |
duration_ms |
float | Total processing time |
debug_log |
list[dict] | Debug snapshots (only when streaming_debug=True) |
Limitations¶
- Accumulated-text scoring, not per-token: Each coherence score reflects the full accumulated output re-evaluated against the premise. Individual tokens don't have independent scores.
- No retraction: Halting stops future tokens from being generated, but tokens already delivered to the client cannot be retracted. Applications that need full rollback should buffer output until a sentence boundary passes scoring.
- Scoring granularity depends on the NLI backend: Heuristic mode uses
word overlap (~0.1 ms/call), NLI mode runs the full DeBERTa pipeline
(~15-50 ms/call). Use
score_every_n > 1oradaptive=Truefor production latency budgets. - Hosted API constraints: When using OpenAI/Anthropic providers, the scorer re-runs the full NLI pipeline on the accumulated text. There is no server-side per-token scoring.