Skip to content

Operational Provenance Ledger

KnowledgeProvenanceLedger records the lifecycle of the knowledge base itself. Where the per-response provenance verifier proves the citation set of one answer, the ledger proves how the knowledge base reached its current state: every ingest, update, and delete is appended as a signed, ordered event. It answers the two questions an auditor asks — where did this chunk come from and what happened to this document — and detects after-the-fact tampering of either the stored content or the mutation history.

Two integrity layers compose:

  • Content commitment. Each event carries a Merkle root over the SHA-256 digests of the chunks it admitted or retired. Editing a stored chunk later changes the digest, so the inclusion proof returned by provenance_of no longer folds to the recorded root.
  • HMAC chain. Each event is folded into an HMAC-signed chain keyed on a digest of the event's full semantic payload. Reordering, deleting, or editing any field of any event breaks verify().

Events persist as one JSON object per line. The ledger reloads and verifies the file on construction, so a process restart resumes the exact chain and a tampered file is rejected before any new event is appended.

Wiring into ingestion

DocumentIngestionPipeline takes an optional ledger. When supplied, every mutation is recorded automatically; when omitted, ingestion behaves exactly as before.

from director_ai.core.ingestion import DocumentIngestionPipeline
from director_ai.core.provenance import KnowledgeProvenanceLedger
from director_ai.core.retrieval.vector_store import VectorGroundTruthStore

ledger = KnowledgeProvenanceLedger(secret=secret_key, path="kb-provenance.jsonl")
pipeline = DocumentIngestionPipeline(store=VectorGroundTruthStore(), ledger=ledger)

result = pipeline.ingest_text(
    "Refunds are available within 30 days.",
    doc_id="refunds",
    source="refunds.md",
)

provenance = ledger.provenance_of(result.chunk_ids[0])
assert provenance.verified            # inclusion proof folds to the event root
assert provenance.source == "refunds.md"
assert ledger.verify() == (True, None)  # chain intact

An update_text that changes content appends an update event that admits the new chunks and retires the previous revision's chunks; provenance_of returns None for a retired chunk. A delete appends a delete event bound to the exact chunk set it removed. An update_text with unchanged content appends no event.

Querying provenance

# Full per-document history in chain order.
for event in ledger.history_for("refunds"):
    print(event.index, event.event_type, event.source, event.timestamp)

# Origin of one chunk, with a self-contained inclusion proof.
prov = ledger.provenance_of("refunds:chunk:0")
if prov is not None and prov.verified:
    print(prov.doc_id, prov.event_type, prov.proof.root.hex())

Tamper detection

verify() re-derives the chain over the persisted events and returns (ok, first_bad_index). Construction raises LedgerTamperError when the persisted file fails this check — whether a field was edited, the events were reordered, or the file was signed with a different secret.

ok, first_bad = ledger.verify()
if not ok:
    raise RuntimeError(f"provenance ledger compromised at event {first_bad}")

Self-updating supersession

A self-updating knowledge base recognises when a new document replaces older material. KnowledgeSupersessionPolicy turns three signals — an explicit supersedes hint, a same-source revision, or a caller-supplied per-document contradiction score — into a reviewable SupersessionDecision. The policy is side-effect free; every non-empty decision is gated on human approval by default, and auto-promotion is opt-in and only fires when every candidate clears a high score bar.

from director_ai.core.provenance import KnowledgeSupersessionPolicy

policy = KnowledgeSupersessionPolicy()
decision = policy.evaluate(
    incoming_doc_id="refunds_v2",
    incoming_source="refunds.md",
    tenant_id="acme",
    existing=pipeline.registry.list_for_tenant("acme"),
    contradiction_scores={"refunds_v1": 0.88},  # from an NLI/similarity verifier
)
# decision.action == "recommend"; decision.requires_human_approval is True

DocumentIngestionPipeline.apply_supersession executes an approved decision: it retires each superseded document's chunks from the store and registry and records a single ledger supersede event linking them to the incoming document. A decision that still needs review is refused unless approved=True.

result = pipeline.apply_supersession(decision, approved=True)
# result.superseded_doc_ids == ("refunds_v1",)
# the retired chunks now resolve to None via ledger.provenance_of(...)
# ledger.history_for("refunds_v2") contains a "supersede" event

Online credibility from feedback

SourceCredibility already tracks a decaying trust score per source and already feeds ProvenanceVerifier's composite trust score. CredibilityFeedbackLoop supplies the missing online-learning step: it folds human approvals and rejections into that tracker, so a source whose cited facts keep getting rejected drifts down while a consistently-approved source drifts up. Share the tracker with the verifier and later responses are scored by what earlier feedback taught.

from director_ai.core.provenance import (
    CredibilityFeedbackLoop,
    ProvenanceChain,
    ProvenanceVerifier,
    SourceCredibility,
)

credibility = SourceCredibility()
loop = CredibilityFeedbackLoop(credibility=credibility)
verifier = ProvenanceVerifier(chain=ProvenanceChain(secret=secret), credibility=credibility)

# A human rejects a response citing "blog-x"; its credibility drops, and the
# verifier's trust score for the next "blog-x" citation drops with it.
loop.observe(source_ids=["blog-x"], human_approved=False)

The same credibility can re-rank retrieval candidates. rerank blends each chunk's relevance (from its distance) with its source credibility; a weight of 0 keeps the pure relevance order, a weight of 1 ranks purely by credibility.

ranked = loop.rerank(evidence_chunks, credibility_weight=0.4)

Stored corrections replay through the loop when the caller can resolve which sources each response cited:

loop.ingest_corrections(
    feedback_store.get_corrections(),
    source_resolver=lambda correction: sources_cited_by(correction.review_id),
)

Counterfactual contradiction explanations

Grounding a claim is not only about finding support — it is about surfacing the evidence that refutes it. ContradictionExplainer scores each retrieved passage against a claim and returns a human-readable account of the contradictions: this claim contradicts the passage from source X because the passage states "…" (contradiction 0.91).

The contradiction signal is injected, like ConflictAwareKnowledgeGuard's score_fn: the caller supplies scorer(passage, claim) -> probability backed by the NLI scorer in director_ai.core.scoring.nli, a rule engine, or a domain model. Keeping the model out of the explainer makes its selection-and-rationale logic deterministic and testable on its own.

from director_ai.core.causal_verifier import ContradictionExplainer

explainer = ContradictionExplainer(scorer=nli_contradiction_probability, threshold=0.5)
report = explainer.explain(claim, retrieved_chunks)
if report.has_contradiction:
    print(report.best.rationale)
    # "This claim contradicts the passage from policy.md because the passage
    #  states: "Refunds are never available." (contradiction 0.92)."

report.contradictions is ordered strongest-first; each entry keeps the originating chunk_index and chunk_source so the contradiction can be traced back to its retrieved passage.

Content commitment

The Merkle commitment is available directly for callers that bind their own content sets. commit_root and prove_inclusion use the Rust kernel (backfire_kernel.rust_merkle_*) with a bit-identical pure-Python reference, so an InclusionProof verifies regardless of which path produced it.

from director_ai.core.provenance import commit_root, prove_inclusion

root = commit_root(leaf_digests)
proof = prove_inclusion(leaf_digests, index=2)
assert proof.verify()
assert proof.root == root

Full API

director_ai.core.provenance.ledger.LedgerEvent dataclass

LedgerEvent(index: int, event_type: str, doc_id: str, tenant_id: str, source: str, content_hash: str, content_root: str, chunk_ids: tuple[str, ...], leaf_hashes: tuple[str, ...], removed_chunk_ids: tuple[str, ...], supersedes: tuple[str, ...], timestamp: float, parent_hash: str, tag: str)

One signed knowledge-base mutation.

chunk_ids are the chunks this event admitted; leaf_hashes is the parallel list of their content digests (hex), the leaves of the event's content commitment. removed_chunk_ids are chunks this event retired (an update retires the previous revision's chunks; a delete retires all of a document's chunks). supersedes records document-level lineage for higher layers. index, parent_hash, and tag are the HMAC-chain fields and are derived, not caller-supplied.

to_json

to_json() -> str

Serialise the full event (including chain fields) as one line.

from_json classmethod

from_json(raw: str) -> LedgerEvent

Parse a persisted event line back into a :class:LedgerEvent.

director_ai.core.provenance.ledger.ChunkProvenance dataclass

ChunkProvenance(chunk_id: str, doc_id: str, tenant_id: str, source: str, event_index: int, event_type: str, timestamp: float, proof: InclusionProof)

The active origin of one chunk plus its inclusion proof.

verified property

verified: bool

Return True when the inclusion proof folds to the root.

director_ai.core.provenance.ledger.KnowledgeProvenanceLedger

KnowledgeProvenanceLedger(*, secret: bytes, path: str | PathLike[str] | None = None, clock: object = None)

Append-only, HMAC-chained, persistent knowledge-mutation ledger.

Parameters:

Name Type Description Default
secret bytes

HMAC key for the underlying chain. Minimum 32 bytes.

required
path str | PathLike[str] | None

JSONL file the events persist to. When the file already exists it is loaded and verified on construction; a tampered file raises :class:LedgerTamperError. Pass None for an in-memory ledger (CI, ephemeral workers).

None
clock object

Timestamp source; injection point for deterministic tests.

None

record_ingest

record_ingest(*, doc_id: str, tenant_id: str, source: str, content_hash: str, chunk_leaves: Sequence[tuple[str, bytes]], supersedes: Sequence[str] = ()) -> LedgerEvent

Append an ingest event admitting chunk_leaves.

chunk_leaves pairs each new chunk id with its content digest (32 raw bytes). Raises :class:ValueError when the chunk set is empty.

record_update

record_update(*, doc_id: str, tenant_id: str, source: str, content_hash: str, chunk_leaves: Sequence[tuple[str, bytes]], removed_chunk_ids: Sequence[str] = (), supersedes: Sequence[str] = ()) -> LedgerEvent

Append an update event admitting new chunks and retiring the previous revision's removed_chunk_ids.

record_delete

record_delete(*, doc_id: str, tenant_id: str, removed_chunk_ids: Sequence[str], source: str = '') -> LedgerEvent

Append a delete event retiring removed_chunk_ids.

The content commitment is taken over the retired chunk ids so the delete is itself bound to the exact set it removed.

record_supersede

record_supersede(*, doc_id: str, tenant_id: str, source: str, supersedes: Sequence[str], removed_chunk_ids: Sequence[str]) -> LedgerEvent

Append a supersede event: doc_id replaces supersedes.

supersedes records the document-level lineage; the retired removed_chunk_ids are committed and dropped from the active set, so a chunk of a superseded document no longer resolves through :meth:provenance_of.

provenance_of

provenance_of(chunk_id: str) -> ChunkProvenance | None

Return the active origin of chunk_id with an inclusion proof.

Returns None when the chunk was never admitted or has since been retired by an update or delete.

history_for

history_for(doc_id: str, *, tenant_id: str | None = None) -> tuple[LedgerEvent, ...]

Return every event for doc_id in chain order.

verify

verify() -> tuple[bool, int | None]

Re-derive the chain over the persisted events.

Returns (ok, first_bad_index). ok is True only when every event's HMAC tag and parent hash match a fresh replay.

snapshot

snapshot() -> tuple[LedgerEvent, ...]

Return a point-in-time copy of every event.

director_ai.core.provenance.ledger.LedgerTamperError

Bases: ValueError

Raised when a persisted ledger fails its integrity check on load.

director_ai.core.provenance.content_commitment.InclusionProof dataclass

InclusionProof(leaf: bytes, index: int, siblings: tuple[bytes, ...], root: bytes)

Proof that leaf sits at index under root.

siblings is the ordered authentication path from the leaf up to the root, exclusive of the root itself. :meth:verify folds the leaf through the path and compares the recomputed root to root — the proof is self-contained, so a verifier needs only the proof, not the original leaf set.

verify

verify() -> bool

Return True when the leaf folds to root along the path.

director_ai.core.provenance.content_commitment.commit_root

commit_root(leaves: Sequence[bytes]) -> bytes

Return the 32-byte Merkle root committing leaves in order.

leaves must be a non-empty sequence of non-empty byte strings — typically 32-byte content digests. Raises :class:ValueError otherwise.

director_ai.core.provenance.content_commitment.prove_inclusion

prove_inclusion(leaves: Sequence[bytes], index: int) -> InclusionProof

Return the :class:InclusionProof for leaves[index].

Raises :class:ValueError when the leaf set is empty/invalid or the index is out of range.

director_ai.core.provenance.supersession.SupersessionCandidate dataclass

SupersessionCandidate(superseded_doc_id: str, reason: str, score: float, evidence_ref: str)

One existing document the incoming document would supersede.

director_ai.core.provenance.supersession.SupersessionDecision dataclass

SupersessionDecision(incoming_doc_id: str, tenant_id: str, incoming_source: str, candidates: tuple[SupersessionCandidate, ...], action: SupersessionAction, requires_human_approval: bool, evidence_refs: tuple[str, ...] = tuple())

Reviewable supersession outcome for one incoming document.

action is "none" when nothing is superseded, "recommend" when supersession is proposed but withheld for human approval, and "promote" when auto-promotion criteria were met. candidates is ordered by descending score.

superseded_doc_ids property

superseded_doc_ids: tuple[str, ...]

Return the ids of every document this decision would retire.

has_candidates property

has_candidates: bool

Return True when at least one document would be superseded.

director_ai.core.provenance.supersession.KnowledgeSupersessionPolicy

KnowledgeSupersessionPolicy(*, min_contradiction_score: float = 0.65, auto_promote: bool = False, auto_promote_threshold: float = 0.95, same_source_score: float = 0.9)

Map supersession signals to a human-gated decision.

Parameters:

Name Type Description Default
min_contradiction_score float

A contradiction score at or above this value makes a document a supersession candidate. Default 0.65.

0.65
auto_promote bool

When True, a decision whose every candidate scores at or above auto_promote_threshold is returned with action="promote" and requires_human_approval=False. When False (default), every non-empty decision is gated on human approval.

False
auto_promote_threshold float

Score bar each candidate must clear for auto-promotion. Default 0.95.

0.95
same_source_score float

Confidence assigned to a same-source revision candidate. Default 0.9.

0.9

evaluate

evaluate(*, incoming_doc_id: str, incoming_source: str, tenant_id: str, existing: Sequence[DocRecord], explicit_supersedes: Sequence[str] = (), contradiction_scores: Mapping[str, float] | None = None) -> SupersessionDecision

Return the supersession decision for one incoming document.

existing is the tenant's current document set (e.g. from :meth:DocRegistry.list_for_tenant). contradiction_scores maps an existing document id to a per-document contradiction score in [0, 1] from a caller-supplied verifier. The incoming document itself is never treated as a candidate.

director_ai.core.ingestion.pipeline.SupersessionResult dataclass

SupersessionResult(incoming_doc_id: str, superseded_doc_ids: tuple[str, ...], chunks_removed: int)

Metadata returned after applying a supersession decision.

superseded_count property

superseded_count: int

Return the number of documents retired by the supersession.

director_ai.core.provenance.credibility_feedback.CredibilityFeedbackLoop

CredibilityFeedbackLoop(*, credibility: SourceCredibility, approve_signal: float = 1.0, reject_signal: float = 0.0)

Update a :class:SourceCredibility tracker from human feedback.

Parameters:

Name Type Description Default
credibility SourceCredibility

The tracker to update. Share the same instance with the :class:ProvenanceVerifier whose trust scores should learn from the feedback.

required
approve_signal float

Signal folded in for an approved response. Default 1.0.

1.0
reject_signal float

Signal folded in for a rejected response. Default 0.0.

0.0

observe

observe(*, source_ids: Sequence[str], human_approved: bool) -> tuple[SourceScore, ...]

Fold one human verdict into every cited source's credibility.

Blank source ids are ignored; each distinct source is observed once. Returns the updated :class:SourceScore for each observed source.

observe_correction

observe_correction(correction: Correction, *, source_ids: Sequence[str]) -> tuple[SourceScore, ...]

Fold a :class:Correction's human verdict into cited sources.

The feedback store records the response, not the citations, so the caller resolves which sources the corrected response cited.

ingest_corrections

ingest_corrections(corrections: Iterable[Correction], *, source_resolver: Callable[[Correction], Sequence[str]]) -> int

Replay stored corrections through the loop.

source_resolver maps each correction to the source ids its response cited. Corrections that resolve to no source are skipped. Returns the number of corrections that updated at least one source.

credibility_of

credibility_of(source_id: str) -> float

Return the current decayed credibility of one source.

rerank

rerank(chunks: Sequence[EvidenceChunk], *, credibility_weight: float = 0.5) -> list[EvidenceChunk]

Return chunks reordered by blended relevance and credibility.

credibility_weight in [0, 1] is the share given to source credibility; the remainder weights retrieval relevance (derived from the chunk distance). A weight of 0 preserves the pure relevance order; a weight of 1 ranks purely by source credibility. The sort is stable, so ties keep their incoming order, and chunk distances are left unchanged.

director_ai.core.causal_verifier.contradiction_explainer.ContradictionExplanation dataclass

ContradictionExplanation(claim: str, chunk_index: int, chunk_source: str, chunk_excerpt: str, score: float, rationale: str)

One passage that contradicts the claim, with a stated rationale.

director_ai.core.causal_verifier.contradiction_explainer.ContradictionReport dataclass

ContradictionReport(claim: str, contradictions: tuple[ContradictionExplanation, ...])

The contradictions found for one claim, strongest first.

has_contradiction property

has_contradiction: bool

Return True when at least one passage contradicts the claim.

best property

best: ContradictionExplanation | None

Return the strongest contradiction, or None when there is none.

director_ai.core.causal_verifier.contradiction_explainer.ContradictionExplainer

ContradictionExplainer(*, scorer: ContradictionScorer, threshold: float = 0.5)

Find and explain passages that contradict a claim.

Parameters:

Name Type Description Default
scorer ContradictionScorer

scorer(passage, claim) -> probability in [0, 1] that the passage contradicts the claim.

required
threshold float

Minimum contradiction probability for a passage to be reported. Default 0.5.

0.5

explain

explain(claim: str, chunks: Sequence[EvidenceChunk]) -> ContradictionReport

Return the contradictions for claim across chunks.

Each passage is scored; passages at or above the threshold become :class:ContradictionExplanation entries sorted by descending score. Empty-text passages are skipped. Raises :class:ValueError for an empty claim.