Skip to content

Stage E — Investigate (Orchestrator)

Status: ✅ Complete File: app/auditforge/orchestrator.py Tests: tests/test_auditforge_orchestrator.py — 30 cases passing

Purpose

Stage E is where the audit actually happens. Everything before this is preparation; everything after is summarization.

The orchestrator runs the validated question battery in parallel, producing findings. It coordinates:

  • asyncio.gather over questions (concurrency capped by LLMClient's internal semaphore — default 20 simultaneous calls)
  • Per-question retrieval → reasoning → evidence-anchor pipeline
  • Live progress streaming via injectable ProgressSink callback
  • Periodic cost telemetry snapshots so partial-run costs survive crash
  • Graceful budget exhaustion (partial results are still useful)
  • Failure isolation — one question failing doesn't kill the run

Per-question execution

_run_question(question, *, client_id, retriever, llm, engagement_id):

question (with optional Stage D retrieval stash)
1. Retrieve corpus chunks
   - Reuse question.retrieval_results if Stage D stashed any
   - Otherwise call retriever(client_id, build_relevance_query(question))
   - Cap to top-5 chunks
2. Format prompt
   - question.prompt_template.format(retrieved_context=..., **prompt_variables)
   - retrieved_context comes from format_chunks(chunks)
3. LLM call (REASONING_MID, max_tokens=2000, temperature=0.1)
   - Metadata: engagement_id, stage="investigate", question_id, primitive
   - BudgetExceeded propagates up; other exceptions caught and logged
4. Parse JSON response (parse_strict_json — same tolerance as Stage B)
5. Build Finding (or None)
   - Detect found_X flag per primitive
   - Map severity/confidence/evidence/remediation
   - Generate finding id (f-{12 hex chars})

Per-primitive finding-fired detection:

Primitive found_* flag
conflict_check found_conflict
consistency_check found_inconsistency
coverage_check found_gap
currency_check found_currency_issue
flow_down_check found_flowdown_gap
citation_integrity_check found_integrity_issue

If the flag is missing or false, the question completes with no finding — that's a normal outcome (the audit found nothing on this concern), not a failure.

Evidence anchoring

parse_evidence(items, question_chunks) is the most-important parser in Stage E. It joins LLM-cited evidence quotes back to the actual retrieved chunks via:

  1. Substring match on verbatim_quote against chunk.text (strongest signal)
  2. Doc-title match secondary

When a match is found, the returned EvidenceCitation carries the real chunk_id (faiss_id), the real source path, the rerank score, and the verbatim quote. This is what makes findings defensible: every quote is anchored to a specific corpus location.

If the LLM hallucinates an evidence quote not in the retrieved chunks, the citation still records the LLM's claim but chunk_id is None and relevance_score is None — surfaces clearly to the auditor that this quote couldn't be traced.

Output: RunResult

@dataclass
class RunResult:
    findings: list[Finding]
    questions_run: int
    questions_failed: int
    questions_no_finding: int
    cost_cents_at_start: float
    cost_cents_at_end: float
    aborted_due_to_budget: bool

    @property
    def cost_cents(self) -> float:
        return self.cost_cents_at_end - self.cost_cents_at_start

questions_run excludes those skipped after budget exhaustion; questions_no_finding counts successful questions that returned None (no finding fired) — distinct from questions_failed.

Progress streaming

The ProgressSink callback fires after each question completes, with:

{
    "type": "question_complete",
    "question_id": "q-X",
    "primitive": "...",
    "completed": int,            # questions done so far
    "total": int,
    "cost_cents": float,
    "budget_utilization": float,
    "finding_id": str | None,    # f-X if a finding fired, else None
}

The frontend SSE channel attaches a sink that pushes events to the auditor's run dashboard. CLI runs attach a sink that prints to stderr.

A failing sink (network error, etc.) is caught and logged but doesn't disrupt question execution.

Cost snapshotting

snapshot_callback fires every snapshot_every_n completions (default 25) with the current llm.budget.spent_cents. Wire to EngagementStore.update_cost so a process crash leaves accurate accounting on disk. A final snapshot fires at run completion regardless of count.

The callback is called sequentially (not in asyncio.gather) so it doesn't compete for the LLM concurrency budget.

Budget exhaustion

When LLMClient raises BudgetExceeded, the per-question wrapper catches it, sets aborted_due_to_budget = True, and returns None for that question. Subsequent questions in flight continue to attempt their LLM calls; each will raise BudgetExceeded immediately because the budget is already exhausted, return None, and complete.

The result preserves all findings produced before the wall hit. This is the design intent: a 70%-complete audit with 40 findings is better than an aborted audit with 0 findings.

The budget governor's tier downshift (LLMClient handles transparently) typically delays budget exhaustion enough that most engagements complete within budget.

Metis retriever adapter

def make_metis_retriever() -> Retriever:
    """Wrap app.rag._retrieve as the async Retriever interface."""

_retrieve is synchronous. The adapter runs it in the default thread pool so it doesn't block the event loop. The per-engagement LLMClient semaphore still bounds concurrent LLM load; thread-pool retrieval is acceptable concurrency.

For tests we pass synthetic retrievers — no real FAISS index needed.

Test coverage

Area Cases
format_chunks 4 (empty, single, truncation, missing fields)
parse_severity 2 (all levels, unknown defaults to medium)
parse_confidence 3 (normal, clamps, garbage)
parse_evidence 4 (chunk match, empty quotes, non-list, cap at 10)
parse_remediation 4 (full, partial, garbage effort, non-dict)
build_finding 4 (fired, not fired, per-primitive found-keys, unknown primitive)
_run_question 5 (full pipeline, no finding, retrieves when no stash, parse fail, LLM fail)
run_investigation 4 (parallel, empty input, budget exhaustion, snapshot callback)

All 30 cases passing. Full suite: 345 pass, no regressions.

Public API

async def run_investigation(
    engagement: AuditEngagement,
    questions: list[Question],
    *,
    client_id: str,
    retriever: Retriever,
    llm: LLMClient,
    progress: ProgressSink | None = None,
    snapshot_every_n: int = 25,
    snapshot_callback: Callable[[float], Awaitable[None]] | None = None,
) -> RunResult

Per-primitive execute() functions

Each primitive module still has an execute() stub. In v1 the orchestrator dispatches generically via _run_question and the per-primitive stubs aren't invoked. They exist as future hooks for primitive-specific execution logic (e.g., flow_down_check may eventually need paired retrieval over parent + child docs separately rather than a single combined retrieval).

When custom logic is needed, replace the primitive's execute() body and update the orchestrator to dispatch via module.execute(...) based on question.primitive.

Cost shape per audit

For a typical 80-question audit: - 80 LLM calls × ~3K input + ~1K output tokens at Sonnet pricing - ~$0.024 per call × 80 = ~$1.90 per audit pass - 3-round deepening (Stage F → C → D → E loop) ~3× → ~$5–6 per audit total - Plus Stage A profile (~$0.05) + Stage B catalog (~$0.57) - Engagement total: $6–10 at neutral tier mix

Cap pricing tier (REASONING_HIGH only): ~$30–50 per audit. Floor pricing tier (mostly MECHANICAL): ~$1–2 with much weaker reasoning quality.

Known limits / future work

  • No retry of failed questions. A question whose LLM call failed (timeout, etc.) is dropped from this round. Stage F deepening could re-add it as a follow-up target. v1 keeps it simple.
  • No mid-run model swap. Tier downshift happens on the LLMClient level transparently. We could expose a per-engagement override that swaps to Opus for questions over a severity threshold. v2 hardening.
  • FindingsStore not yet wired. run_investigation returns the findings list; persistence is the caller's responsibility (the Phase 0 FindingsStore stub raises NotImplementedError). The orchestrator will accept a findings_store parameter once that's built.
  • Per-primitive execute() stubs unused. v1 dispatches generically. When the first primitive needs custom logic, the dispatcher pattern gets reintroduced.