Skip to content

Unified DP-RAG Pipeline

director_ai.core.dp_rag.pipeline.DPRagPipeline

DPRagPipeline(max_epsilon: float, *, retrieval_sensitivity: float = 1.0, decode_sensitivity: float = 1.0, score_sensitivity: float = 1.0, seed: int | None = None)

Meter retrieval, decoding, and score release on one per-tenant budget.

Parameters:

Name Type Description Default
max_epsilon float

Per-tenant cumulative privacy budget shared across all three stages.

required
retrieval_sensitivity float

L1 sensitivity of the retrieval similarity score (default 1.0).

1.0
decode_sensitivity float

L∞ sensitivity of the decoder logits (default 1.0).

1.0
score_sensitivity float

L1 sensitivity of the released coherence score (default 1.0).

1.0
seed int | None

Optional base seed for reproducible noise in tests; production uses system entropy (None). Each noisy operation advances the seed so successive operations draw independent noise.

None

remaining

remaining(tenant_id: str = '') -> float

Shared privacy budget left for a tenant across all stages.

spent

spent(tenant_id: str = '') -> float

Shared privacy budget already consumed by a tenant.

stage_log

stage_log(tenant_id: str = '') -> tuple[StageCharge, ...]

The per-stage charges recorded for a tenant, in order.

rank

rank(items: list[ScoredItem], *, tenant_id: str = '', epsilon: float) -> PipelineRanking

DP-rank items (Laplace) and charge epsilon to the shared budget.

decode

decode(logits: Sequence[float], *, tenant_id: str = '', epsilon: float) -> DPTokenChoice

DP-select a next token (exponential mechanism) on the shared budget.

release_score

release_score(score: float, *, tenant_id: str = '', epsilon: float) -> float

DP-release a coherence score (Laplace) on the shared budget.

director_ai.core.dp_rag.decoding.DPTokenDecoder

DPTokenDecoder(*, sensitivity: float = 1.0, seed: int | None = None)

Select a next token under ε-DP via the exponential mechanism.

Parameters:

Name Type Description Default
sensitivity float

L∞ sensitivity Δ of the logits to one record of the conditioning context (default 1.0). Must be non-negative.

1.0
seed int | None

Optional deterministic seed for tests and simulations only. Production leaves this unset so the mechanism reads system entropy each call. Each call advances the seed so successive selections draw independent noise.

None

sensitivity property

sensitivity: float

The declared L∞ logit sensitivity.

select

select(logits: Sequence[float], *, epsilon: float) -> DPTokenChoice

Return the ε-DP selected token index for logits.

Adds Gumbel(0, 2Δ/ε) noise to each logit and takes the argmax — the exponential mechanism over the token vocabulary. Δ = 0 (no sensitivity) means the logits carry no private signal, so the noise scale is zero and the plain argmax is returned.

director_ai.core.dp_rag.decoding.DPTokenChoice dataclass

DPTokenChoice(index: int, noisy_logit: float, epsilon_spent: float)

The DP-selected token index, its noisy logit, and the ε spent.

to_dict

to_dict() -> dict[str, float | int]

Tenant-safe view (no raw logits or context).

Boundary

A RAG answer leaks the private retrieval corpus through three stages, not one: retrieval ranking, next-token decoding, and any released coherence score. DifferentiallyPrivateRetrieval meters retrieval alone; DPRagPipeline charges all three stages against one per-tenant (ε) accountant, so the budget reflects the whole pipeline.

Stage Mechanism Guarantee
rank Laplace noise on similarity scores ε-DP ranking
decode exponential mechanism (Gumbel-max) on logits ε-DP token selection
release_score Laplace noise on the coherence score ε-DP score

Every stage is pure ε-DP, so the loss composes additively. A stage that would push a tenant past max_epsilon is refused with DPBudgetExceededError before any noise is drawn or budget charged; per-stage charges are logged (stage + ε + tenant) so a tenant can audit where the budget went without any raw query, logit, or score crossing the boundary.

from director_ai.core.dp_rag import DPRagPipeline, ScoredItem

pipe = DPRagPipeline(max_epsilon=10.0)

ranking = pipe.rank(
    [ScoredItem("doc-1", 0.91), ScoredItem("doc-2", 0.44)],
    tenant_id="tenant-a",
    epsilon=2.0,
)
choice = pipe.decode(next_token_logits, tenant_id="tenant-a", epsilon=3.0)
released = pipe.release_score(coherence, tenant_id="tenant-a", epsilon=1.0)

print(pipe.spent("tenant-a"), pipe.remaining("tenant-a"))  # 6.0  4.0

ProductionGuard.dp_rag_pipeline() builds one with the guard's defaults.

Decoding mechanism

DPTokenDecoder selects the next token with the exponential mechanism: token i with logit u_i is released with probability ∝ exp(ε · u_i / (2Δ)), where Δ is the L∞ sensitivity of the logits to one record of the conditioning context. This is implemented with the equivalent Gumbel-max trick — adding Gumbel(0, 2Δ/ε) noise to each logit and taking the argmax (McSherry & Talwar, FOCS 2007). The selection is pure ε-DP in the conditioning data.

Privacy-utility tradeoff

benchmarks/dp_rag_privacy_utility.py measures utility against ε: top-k overlap for retrieval and top-1 agreement for decoding, averaged over seeded trials. Utility rises monotonically with ε toward the no-privacy baseline of 1.0, so a deployment can pick ε from the measured curve. For Gaussian-noise pipelines, account composition with the Rényi-DP accountant instead of the pure-ε accountant.