Skip to content

BatchProcessor

Concurrent batch processing for CoherenceScorer and CoherenceAgent. Processes multiple prompts in parallel with configurable concurrency and progress tracking.

Usage

from director_ai import CoherenceAgent
from director_ai.core.batch import BatchProcessor

agent = CoherenceAgent(use_nli=True)
processor = BatchProcessor(agent, max_concurrency=8)

result = processor.process_batch([
    "What is the capital of France?",
    "What is the speed of light?",
    "When was Python released?",
])

print(f"Succeeded: {result.succeeded}/{result.total}")
print(f"Duration: {result.duration_seconds:.1f}s")
for r in result.results:
    print(f"  output={r.output}, halted={r.halted}")

Constructor Parameters

Parameter Type Default Description
backend CoherenceAgent \| CoherenceScorer required Backend instance for processing
max_concurrency int 4 Maximum parallel workers
item_timeout float 60.0 Per-item timeout in seconds

Methods

process_batch()

result = processor.process_batch(
    prompts: list[str],
    tenant_id: str = "",
) -> BatchResult

process_batch_async()

result = await processor.process_batch_async(
    prompts: list[str],
    tenant_id: str = "",
    max_concurrency: int | None = None,
) -> BatchResult

BatchResult

Field Type Description
results list[ReviewResult \| tuple[bool, CoherenceScore]] Per-item results
errors list[tuple[int, str]] Failed items: (index, reason)
total int Total items submitted
succeeded int Items that completed
failed int Items that errored
duration_seconds float Wall-clock time for the batch

Scorer-Level Batching

For scoring prompt/response pairs without the agent orchestration layer, use CoherenceScorer.review_batch():

from director_ai import CoherenceScorer

scorer = CoherenceScorer(threshold=0.3, use_nli=True)

items = [
    ("What is 2+2?", "The answer is 4."),
    ("Capital of France?", "Paris is in Germany."),
]
results = scorer.review_batch(items)
for approved, cs in results:
    print(f"approved={approved}, score={cs.score:.3f}")

review_batch() batches logical and factual NLI through score_batch() (2 GPU forward passes total) when NLI is available. Dialogue items fall back to sequential review(). Without NLI, all items are scored sequentially via heuristics.

Full API

director_ai.core.runtime.batch.BatchProcessor

BatchProcessor(backend: Any, max_concurrency: int = 4, item_timeout: float = 60.0)

Batch processing wrapper for CoherenceAgent or CoherenceScorer.

Parameters:

Name Type Description Default
backend CoherenceAgent or CoherenceScorer instance.
required
max_concurrency int — maximum parallel workers.
4

process_batch

process_batch(prompts: list[str], tenant_id: str = '') -> BatchResult

Process a batch of prompts with concurrent execution.

Uses backend.process(prompt) if backend is CoherenceAgent.

review_batch

review_batch(items: list[tuple[str, str]], tenant_id: str = '') -> BatchResult

Batch-review (prompt, response) pairs.

When the backend has a review_batch method (CoherenceScorer), delegates to it for coalesced NLI inference (2 GPU kernel calls total instead of 2*N). Falls back to per-item ThreadPoolExecutor.

process_batch_async async

process_batch_async(prompts: list[str], max_concurrency: int | None = None, tenant_id: str = '') -> BatchResult

Async version of process_batch using asyncio concurrency.

review_batch_async async

review_batch_async(items: list[tuple[str, str]], max_concurrency: int | None = None, tenant_id: str = '') -> BatchResult

Async version of review_batch.

Offloads coalesced scorer.review_batch() to the thread pool when available, falling back to per-item asyncio concurrency.