Skip to content

Voice AI Integration

Real-time hallucination prevention for text-to-speech pipelines. VoiceGuard sits between your LLM and TTS engine, scoring tokens as they arrive and halting the stream before hallucinated content reaches the speaker.

Once words are spoken, you can't unsay them. VoiceGuard ensures they don't get spoken in the first place.

Quick Start

from director_ai import VoiceGuard

guard = VoiceGuard(
    facts={"refund": "30-day refund policy", "hours": "9am-5pm EST"},
    prompt="What is the refund policy?",
)

for token in llm.stream("What is the refund policy?"):
    result = guard.feed(token)
    if result.halted:
        tts.stop()
        tts.speak(result.recovery_text)
        break
    tts.speak_chunk(result.token)

How It Works

LLM tokens ──→ VoiceGuard.feed() ──→ TTS engine ──→ speaker
                    ├─ approved=True  → token forwarded to TTS
                    └─ halted=True    → stream stopped, recovery text spoken

VoiceGuard accumulates tokens and periodically scores the growing text against your knowledge base. Three halt mechanisms run simultaneously:

Mechanism Trigger Behavior
Hard halt Single score < hard_limit Immediate stop, token rejected
Window average Rolling average < threshold Soft halt (finish sentence) or hard halt
Already halted Previous halt fired All subsequent tokens rejected

Soft halt mode (default) waits for a sentence boundary (., !, ?) before signaling halt, so the speaker finishes a complete sentence instead of cutting off mid-word.

Parameters

Parameter Type Default Description
facts dict[str, str] None Key-value fact pairs for grounding
store GroundTruthStore None Pre-built store (overrides facts)
prompt str "" User prompt (used as NLI premise)
threshold float 0.3 Window average coherence floor
hard_limit float 0.25 Immediate halt threshold
score_every int 4 Score every N-th token
window_size int 8 Sliding window for average
soft_halt bool True Wait for sentence end before halting
recovery str "I need to verify..." Text spoken on halt
use_nli bool True Enable NLI model scoring

VoiceToken

feed() returns a VoiceToken for every token:

Field Type Description
token str The input token
index int Position in stream
approved bool Whether to forward to TTS
coherence float Current coherence score
halted bool Whether halt was triggered
halt_reason str hard_limit, window_avg, soft_halt_sentence_end, or already_halted
recovery_text str Text to speak instead (set on halt)

Multi-Turn Conversations

Call reset() between conversation turns and set_prompt() for each new user message:

guard = VoiceGuard(store=my_knowledge_base)

while True:
    user_text = asr.listen()
    guard.reset()
    guard.set_prompt(user_text)

    for token in llm.stream(user_text):
        result = guard.feed(token)
        if result.halted:
            tts.speak(result.recovery_text)
            break
        tts.speak_chunk(result.token)

ElevenLabs Example

from elevenlabs import stream as el_stream
from elevenlabs.client import ElevenLabs
from openai import OpenAI
from director_ai import VoiceGuard

client = OpenAI()
tts = ElevenLabs()
guard = VoiceGuard(
    facts={"product": "Widget Pro costs $49, ships in 2 days"},
    prompt="Tell me about Widget Pro",
)

# Collect approved text, halt if needed
approved_text = []
response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Tell me about Widget Pro"}],
    stream=True,
)

for chunk in response:
    token = chunk.choices[0].delta.content or ""
    if not token:
        continue
    result = guard.feed(token)
    if result.halted:
        approved_text.append(f" {result.recovery_text}")
        break
    approved_text.append(token)

# Send approved text to TTS
audio = tts.text_to_speech.convert(
    text="".join(approved_text),
    voice_id="JBFqnCBsd6RMkjVDRZzb",
    model_id="eleven_turbo_v2_5",
)
el_stream(audio)

Async Voice Pipeline (v3.12+)

For async voice pipelines, AsyncVoiceGuard and voice_pipeline provide true streaming audio — approved tokens go to TTS immediately, not collect-then-speak.

5-Line Integration

from director_ai.voice import voice_pipeline, ElevenLabsAdapter

tts = ElevenLabsAdapter(voice_id="JBFqnCBsd6RMkjVDRZzb")

async for audio_chunk in voice_pipeline(llm_token_stream, tts, facts=my_facts, prompt=question):
    await websocket.send_bytes(audio_chunk)

The pipeline buffers approved tokens into sentence-sized chunks for natural TTS prosody, sends each sentence to the adapter as it completes, and yields audio bytes as they arrive. On halt, the pipeline flushes remaining text, synthesises the recovery message, and stops.

Architecture

LLM tokens ──→ AsyncVoiceGuard ──→ sentence buffer ──→ TTSAdapter ──→ audio bytes
                    │                                       │
                    ├─ approved ──→ buffer token             ├─ ElevenLabsAdapter
                    └─ halted   ──→ flush + recovery + stop  ├─ OpenAITTSAdapter
                                                             └─ DeepgramAdapter

AsyncVoiceGuard (Low-Level)

Same scoring as sync VoiceGuard, but async-native. Use this directly when you don't need TTS integration:

from director_ai.voice import AsyncVoiceGuard

guard = AsyncVoiceGuard(facts=my_facts, prompt=question)

async for result in guard.feed_stream(llm_token_stream):
    if result.halted:
        await handle_halt(result)
        break
    await process_approved_token(result.token)

feed_stream() accepts both sync and async iterables. feed() scores a single token.

TTS Adapters

Install the TTS SDK you need:

pip install elevenlabs      # ElevenLabsAdapter
pip install openai           # OpenAITTSAdapter
pip install deepgram-sdk     # DeepgramAdapter

Each adapter implements the TTSAdapter ABC:

class TTSAdapter(ABC):
    async def synthesise(self, text: str) -> AsyncIterator[bytes]: ...
    async def flush(self) -> AsyncIterator[bytes]: ...
    async def close(self) -> None: ...

Write your own adapter for any TTS engine by subclassing TTSAdapter.

Pipeline Parameters

voice_pipeline() accepts all AsyncVoiceGuard parameters plus:

Parameter Type Default Description
on_halt Callable None Sync or async callback, receives halting VoiceToken
sentence_buffer bool True Buffer tokens into sentences before TTS (better prosody)

Halt Callback

async def on_halt(vtoken):
    log.warning(f"Halt: {vtoken.halt_reason} at coherence {vtoken.coherence:.3f}")
    await notify_supervisor(vtoken)

async for audio in voice_pipeline(tokens, tts, facts=facts, on_halt=on_halt):
    play(audio)

Latency Budget

Voice AI requires end-to-end latency under 500ms for natural conversation. VoiceGuard with score_every=4 adds negligible overhead:

Component Latency Notes
LLM generation 50-200ms first token Depends on provider
VoiceGuard scoring <1ms per check 0.5ms on L40S, heuristic <0.1ms
TTS synthesis 50-150ms Depends on provider
Audio streaming ~100ms buffer Network + playback

With score_every=4 and ~5 tokens/second, scoring runs once per ~800ms — well within budget. Increase score_every for lower overhead at the cost of later halt detection.

Tuning for Voice

Voice conversations differ from text:

  • Lower hard_limit (0.20-0.25): Voice responses are shorter and more conversational. Transient dips are common — don't halt on brief fluctuations.
  • soft_halt=True (default): Cutting mid-word sounds unnatural. Always finish the sentence.
  • Short recovery text: "Let me check on that" is better than a paragraph. The user is listening, not reading.
  • Higher score_every (4-8): Voice generates ~5 tokens/second. Scoring every token wastes cycles on partial words.

Full API

director_ai.integrations.voice.VoiceGuard

VoiceGuard(facts=None, store=None, threshold=0.3, score_every=4, hard_limit=0.25, window_size=8, soft_halt=True, recovery='I need to verify that information. One moment.', use_nli=True, prompt='')

Real-time token filter for voice AI pipelines.

Accumulates tokens, scores the growing text against a knowledge base (or prompt-only NLI), and signals halt before hallucinated content reaches the TTS engine.

Parameters:

Name Type Description Default
facts dict or GroundTruthStore — grounding knowledge.
None
threshold float — coherence floor (default 0.3).
0.3
score_every int — score every N-th token (default 4, ~20ms at 5 tokens/s).
4
hard_limit float — immediate halt below this score (default 0.25).
0.25
window_size int — sliding window for trend detection (default 8).
8
soft_halt bool — if True, finish current sentence before halting.
True
recovery str — text spoken when halt fires (default: brief apology).
'I need to verify that information. One moment.'
use_nli bool — enable NLI model (default True).
True

set_prompt

set_prompt(prompt: str) -> None

Set the user prompt for the current utterance.

reset

reset() -> None

Reset state for a new utterance (new turn in conversation).

feed

feed(token: str) -> VoiceToken

Feed one token from the LLM stream. Returns scoring result.

Call this for every token the LLM generates. Pass the returned token to TTS only if result.halted is False.

feed_all

feed_all(tokens) -> list[VoiceToken]

Feed multiple tokens, return results. Stops after halt.

director_ai.integrations.voice.VoiceToken dataclass

VoiceToken(token: str, index: int, approved: bool, coherence: float, halted: bool = False, halt_reason: str = '', recovery_text: str = '')

Result of feeding one token to the voice guard.

director_ai.voice.guard.AsyncVoiceGuard

AsyncVoiceGuard(facts: dict[str, str] | None = None, store: GroundTruthStore | None = None, threshold: float = 0.3, score_every: int = 4, hard_limit: float = 0.25, window_size: int = 8, soft_halt: bool = True, recovery: str = 'I need to verify that information. One moment.', use_nli: bool = True, prompt: str = '')

Async real-time token filter for voice AI pipelines.

Accumulates tokens, scores the growing text against a knowledge base (or prompt-only NLI), and signals halt before hallucinated content reaches the TTS engine. Runs on a single event loop — no locks.

Parameters:

Name Type Description Default
facts dict or GroundTruthStore — grounding knowledge.
None
threshold float — coherence floor (default 0.3).
0.3
score_every int — score every N-th token (default 4).
4
hard_limit float — immediate halt below this score (default 0.25).
0.25
window_size int — sliding window for trend detection (default 8).
8
soft_halt bool — if True, finish current sentence before halting.
True
recovery str — text spoken when halt fires.
'I need to verify that information. One moment.'
use_nli bool — enable NLI model (default True).
True

set_prompt

set_prompt(prompt: str) -> None

Set the user prompt for the current utterance.

reset

reset() -> None

Reset state for a new utterance (new turn in conversation).

feed async

feed(token: str) -> VoiceToken

Feed one token. Returns scoring result.

Scoring calls CoherenceScorer.review() which is synchronous. When NLI is enabled the scorer does CPU-heavy inference, so we offload to a thread to avoid blocking the event loop.

feed_stream async

feed_stream(tokens: AsyncIterator[str] | Iterator[str]) -> AsyncIterator[VoiceToken]

Feed a token stream. Yields VoiceToken per token. Stops after halt.

director_ai.voice.pipeline.voice_pipeline async

voice_pipeline(token_source: AsyncIterator[str] | Iterator[str], tts: TTSAdapter, *, facts: dict[str, str] | None = None, store: GroundTruthStore | None = None, prompt: str = '', threshold: float = 0.3, hard_limit: float = 0.25, score_every: int = 4, soft_halt: bool = True, recovery: str = 'I need to verify that information. One moment.', use_nli: bool = True, on_halt: Callable[[VoiceToken], Awaitable[None] | None] | None = None, sentence_buffer: bool = True) -> AsyncIterator[bytes]

Stream guardrailed audio from an LLM token stream.

Feeds tokens through :class:AsyncVoiceGuard, buffers approved text into sentence-sized chunks, synthesises via the given :class:TTSAdapter, and yields audio bytes as they arrive.

On halt the pipeline flushes any buffered text, synthesises the recovery message, calls on_halt, then stops.

Parameters:

Name Type Description Default
token_source async or sync iterable of str — LLM token stream.
required
tts TTSAdapter — audio synthesis backend.
required
facts dict[str, str] | None

recovery, use_nli : forwarded to AsyncVoiceGuard.

None
store dict[str, str] | None

recovery, use_nli : forwarded to AsyncVoiceGuard.

None
prompt dict[str, str] | None

recovery, use_nli : forwarded to AsyncVoiceGuard.

None
threshold dict[str, str] | None

recovery, use_nli : forwarded to AsyncVoiceGuard.

None
hard_limit dict[str, str] | None

recovery, use_nli : forwarded to AsyncVoiceGuard.

None
score_every dict[str, str] | None

recovery, use_nli : forwarded to AsyncVoiceGuard.

None
soft_halt dict[str, str] | None

recovery, use_nli : forwarded to AsyncVoiceGuard.

None
on_halt optional callback receiving the halting VoiceToken.
None
sentence_buffer if True (default), batch tokens into sentences

before sending to TTS for natural prosody. If False, send each approved token immediately (lower latency, choppier audio).

True

director_ai.voice.adapters.TTSAdapter

Bases: ABC

Converts approved text into streaming audio bytes.

synthesise abstractmethod async

synthesise(text: str) -> AsyncIterator[bytes]

Stream audio bytes for a text fragment.

flush async

flush() -> AsyncIterator[bytes]

Flush any buffered audio. Override if your TTS buffers internally.

close async

close() -> None

Release resources. Override in subclasses that hold connections.