Stage E — Investigate (Orchestrator)¶
Status: ✅ Complete
File: app/auditforge/orchestrator.py
Tests: tests/test_auditforge_orchestrator.py — 30 cases passing
Purpose¶
Stage E is where the audit actually happens. Everything before this is preparation; everything after is summarization.
The orchestrator runs the validated question battery in parallel, producing findings. It coordinates:
asyncio.gatherover questions (concurrency capped by LLMClient's internal semaphore — default 20 simultaneous calls)- Per-question retrieval → reasoning → evidence-anchor pipeline
- Live progress streaming via injectable
ProgressSinkcallback - Periodic cost telemetry snapshots so partial-run costs survive crash
- Graceful budget exhaustion (partial results are still useful)
- Failure isolation — one question failing doesn't kill the run
Per-question execution¶
_run_question(question, *, client_id, retriever, llm, engagement_id):
question (with optional Stage D retrieval stash)
│
▼
1. Retrieve corpus chunks
- Reuse question.retrieval_results if Stage D stashed any
- Otherwise call retriever(client_id, build_relevance_query(question))
- Cap to top-5 chunks
│
▼
2. Format prompt
- question.prompt_template.format(retrieved_context=..., **prompt_variables)
- retrieved_context comes from format_chunks(chunks)
│
▼
3. LLM call (REASONING_MID, max_tokens=2000, temperature=0.1)
- Metadata: engagement_id, stage="investigate", question_id, primitive
- BudgetExceeded propagates up; other exceptions caught and logged
│
▼
4. Parse JSON response (parse_strict_json — same tolerance as Stage B)
│
▼
5. Build Finding (or None)
- Detect found_X flag per primitive
- Map severity/confidence/evidence/remediation
- Generate finding id (f-{12 hex chars})
Per-primitive finding-fired detection:
| Primitive | found_* flag |
|---|---|
conflict_check |
found_conflict |
consistency_check |
found_inconsistency |
coverage_check |
found_gap |
currency_check |
found_currency_issue |
flow_down_check |
found_flowdown_gap |
citation_integrity_check |
found_integrity_issue |
If the flag is missing or false, the question completes with no finding — that's a normal outcome (the audit found nothing on this concern), not a failure.
Evidence anchoring¶
parse_evidence(items, question_chunks) is the most-important parser
in Stage E. It joins LLM-cited evidence quotes back to the actual
retrieved chunks via:
- Substring match on
verbatim_quoteagainstchunk.text(strongest signal) - Doc-title match secondary
When a match is found, the returned EvidenceCitation carries the real
chunk_id (faiss_id), the real source path, the rerank score, and
the verbatim quote. This is what makes findings defensible: every
quote is anchored to a specific corpus location.
If the LLM hallucinates an evidence quote not in the retrieved chunks,
the citation still records the LLM's claim but chunk_id is None and
relevance_score is None — surfaces clearly to the auditor that this
quote couldn't be traced.
Output: RunResult¶
@dataclass
class RunResult:
findings: list[Finding]
questions_run: int
questions_failed: int
questions_no_finding: int
cost_cents_at_start: float
cost_cents_at_end: float
aborted_due_to_budget: bool
@property
def cost_cents(self) -> float:
return self.cost_cents_at_end - self.cost_cents_at_start
questions_run excludes those skipped after budget exhaustion;
questions_no_finding counts successful questions that returned None
(no finding fired) — distinct from questions_failed.
Progress streaming¶
The ProgressSink callback fires after each question completes, with:
{
"type": "question_complete",
"question_id": "q-X",
"primitive": "...",
"completed": int, # questions done so far
"total": int,
"cost_cents": float,
"budget_utilization": float,
"finding_id": str | None, # f-X if a finding fired, else None
}
The frontend SSE channel attaches a sink that pushes events to the auditor's run dashboard. CLI runs attach a sink that prints to stderr.
A failing sink (network error, etc.) is caught and logged but doesn't disrupt question execution.
Cost snapshotting¶
snapshot_callback fires every snapshot_every_n completions (default
25) with the current llm.budget.spent_cents. Wire to
EngagementStore.update_cost so a process crash leaves accurate
accounting on disk. A final snapshot fires at run completion regardless
of count.
The callback is called sequentially (not in asyncio.gather) so it
doesn't compete for the LLM concurrency budget.
Budget exhaustion¶
When LLMClient raises BudgetExceeded, the per-question wrapper
catches it, sets aborted_due_to_budget = True, and returns None for
that question. Subsequent questions in flight continue to attempt their
LLM calls; each will raise BudgetExceeded immediately because the
budget is already exhausted, return None, and complete.
The result preserves all findings produced before the wall hit. This is the design intent: a 70%-complete audit with 40 findings is better than an aborted audit with 0 findings.
The budget governor's tier downshift (LLMClient handles transparently) typically delays budget exhaustion enough that most engagements complete within budget.
Metis retriever adapter¶
def make_metis_retriever() -> Retriever:
"""Wrap app.rag._retrieve as the async Retriever interface."""
_retrieve is synchronous. The adapter runs it in the default thread
pool so it doesn't block the event loop. The per-engagement LLMClient
semaphore still bounds concurrent LLM load; thread-pool retrieval is
acceptable concurrency.
For tests we pass synthetic retrievers — no real FAISS index needed.
Test coverage¶
| Area | Cases |
|---|---|
format_chunks |
4 (empty, single, truncation, missing fields) |
parse_severity |
2 (all levels, unknown defaults to medium) |
parse_confidence |
3 (normal, clamps, garbage) |
parse_evidence |
4 (chunk match, empty quotes, non-list, cap at 10) |
parse_remediation |
4 (full, partial, garbage effort, non-dict) |
build_finding |
4 (fired, not fired, per-primitive found-keys, unknown primitive) |
_run_question |
5 (full pipeline, no finding, retrieves when no stash, parse fail, LLM fail) |
run_investigation |
4 (parallel, empty input, budget exhaustion, snapshot callback) |
All 30 cases passing. Full suite: 345 pass, no regressions.
Public API¶
async def run_investigation(
engagement: AuditEngagement,
questions: list[Question],
*,
client_id: str,
retriever: Retriever,
llm: LLMClient,
progress: ProgressSink | None = None,
snapshot_every_n: int = 25,
snapshot_callback: Callable[[float], Awaitable[None]] | None = None,
) -> RunResult
Per-primitive execute() functions¶
Each primitive module still has an execute() stub. In v1 the orchestrator
dispatches generically via _run_question and the per-primitive stubs
aren't invoked. They exist as future hooks for primitive-specific
execution logic (e.g., flow_down_check may eventually need paired
retrieval over parent + child docs separately rather than a single
combined retrieval).
When custom logic is needed, replace the primitive's execute() body
and update the orchestrator to dispatch via module.execute(...) based
on question.primitive.
Cost shape per audit¶
For a typical 80-question audit: - 80 LLM calls × ~3K input + ~1K output tokens at Sonnet pricing - ~$0.024 per call × 80 = ~$1.90 per audit pass - 3-round deepening (Stage F → C → D → E loop) ~3× → ~$5–6 per audit total - Plus Stage A profile (~$0.05) + Stage B catalog (~$0.57) - Engagement total: $6–10 at neutral tier mix
Cap pricing tier (REASONING_HIGH only): ~$30–50 per audit. Floor pricing tier (mostly MECHANICAL): ~$1–2 with much weaker reasoning quality.
Known limits / future work¶
- No retry of failed questions. A question whose LLM call failed (timeout, etc.) is dropped from this round. Stage F deepening could re-add it as a follow-up target. v1 keeps it simple.
- No mid-run model swap. Tier downshift happens on the LLMClient level transparently. We could expose a per-engagement override that swaps to Opus for questions over a severity threshold. v2 hardening.
- FindingsStore not yet wired.
run_investigationreturns the findings list; persistence is the caller's responsibility (the Phase 0 FindingsStore stub raises NotImplementedError). The orchestrator will accept afindings_storeparameter once that's built. - Per-primitive
execute()stubs unused. v1 dispatches generically. When the first primitive needs custom logic, the dispatcher pattern gets reintroduced.