Skip to content

Runner + FindingsStore

Status: ✅ Complete Files: app/auditforge/runner.py, app/auditforge/findings.py (FindingsStore section) Tests: tests/test_auditforge_runner.py (10 cases), tests/test_auditforge_findings_store.py (11 cases) — all passing

Purpose

The runner is the bridge between the per-stage modules and the rest of the world (CLI, REST endpoints, scheduled jobs). It:

  • Drives the full A→B→C→D→E→F→G pipeline
  • Manages engagement state transitions
  • Iterates Stage B-F until convergence (max iterations, low new findings, or budget cap)
  • Persists findings to FindingsStore at iteration boundaries
  • Snapshots cost telemetry into the engagement record
  • Streams aggregated progress events
  • Handles per-stage failures gracefully — upstream failures mark the engagement FAILED; Stage F or G failures degrade to partial output

The FindingsStore persists findings to S3 with the same lazy-load + best-effort upload pattern as EngagementStore and PilotStore. One JSON object per engagement at engagement.findings_key.

Pipeline flow

engagement (CREATED) + intake
engagement_store.update_intake
[transition: PROFILING]
profile_corpus  (Stage A)
for iteration in range(max_iterations):
    [transition: CATALOGING (first iter)]
    build_catalog (Stage B, prior_findings + iteration informs prompts)
    synthesize_questions (Stage C)
    validate_questions (Stage D, retrieval-stash for Stage E reuse)
    [transition: INVESTIGATING (first iter)]
    run_investigation (Stage E)
    findings_store.replace_all  ◄── persistence boundary
    engagement_store.update_finding_counts
    _should_continue?
       ├── max_iterations reached? STOP
       ├── budget_utilization >= threshold? STOP
       ├── new findings this round < min? STOP (after first iter)
       ├── aborted_due_to_budget? STOP
       └── otherwise: CONTINUE
            ▼ (continue)
    [transition: DEEPENING]
    cluster_and_deepen (Stage F)
    findings_store.replace_all  ◄── related_finding_ids set in-place
            └─► next iteration

Final cluster_and_deepen if not done in last iter
[transition: REPORTING]
generate_deliverable (Stage G)
[transition: FINDINGS_REVIEW]
AuditResult

Convergence (_should_continue)

Pure function — easy to test. Returns (continue?, reason_if_stop). Order of checks (each is a stop condition):

  1. aborted_due_to_budget from Stage E run → "budget_exhausted"
  2. iteration + 1 >= max_iterations → "max_iterations_reached"
  3. budget_utilization >= convergence_budget_pct (default 0.85) → "budget_threshold_X_reached"
  4. After first iter only: findings_new_this_round < convergence_min_new_findings (default 3) → "low_new_findings_N"

Default behavior: 3 iteration rounds max, soft-stop at 85% budget, soft-stop after iteration 1 if a round produced < 3 new findings. All parameters tunable.

Engagement state machine

The runner makes the engagement progress through these states:

CREATED ─► (intake persisted)
PROFILING ─► (profile generated)
CATALOGING ─► (first iteration's catalog ready)
INVESTIGATING ─► (first iteration's findings)
DEEPENING ⇄ INVESTIGATING (loop until convergence)
REPORTING ─► (deliverable generated)
FINDINGS_REVIEW ◄─── auditor takes over here

FAILED from any stage's unrecoverable error. update_status and transition calls are wrapped in try/except so the runner never aborts on persistence failure (logged but continues).

Failure isolation

Stage failure Runner behavior
Stage A (profile) FAILED state, abort run, return AuditResult with failed_at_stage="profile"
Stage B/C/D within iteration FAILED state, abort run, persist what completed
Stage E (per-question) Already isolated by orchestrator; doesn't bubble up
Stage F (deepen) Logged, non-fatal — iteration continues without follow-up signals
Stage G (report) Logged, non-fatal — engagement still transitions to FINDINGS_REVIEW with deliverable=None

The principle: partial output is more valuable than no output. A run that produced 40 findings but failed to generate a polished deliverable is still useful to the auditor.

AuditResult shape

@dataclass
class AuditResult:
    engagement: AuditEngagement
    profile: CorpusProfile | None
    findings: list[Finding]
    iterations: list[IterationRecord]    # one per round
    deepen_results: list[DeepenResult]
    deliverable: Deliverable | None
    cost_cents: float
    aborted_due_to_budget: bool
    converged: bool                       # True if stopped via max_iter or low-findings
    failed_at_stage: str | None
    failure_reason: str | None

IterationRecord per round captures questions_synthesized, questions_validated, questions_dropped, questions_run, findings_new, cost_cents, aborted_due_to_budget. Used for run-dashboard reporting and post-engagement analysis.

Progress event stream

Progress events emitted via ProgressSink callback (already used by Stage E):

{"type": "stage_start",    "stage": "profile"}
{"type": "stage_complete", "stage": "profile", "total_chunks": ..., ...}
{"type": "stage_start",    "stage": "catalog", "iteration": 0}
{"type": "stage_complete", "stage": "catalog", "iteration": 0,
 "concepts": N, "doc_pairs": N, ...}
{"type": "stage_complete", "stage": "synthesize", ...}
{"type": "stage_complete", "stage": "validate", "kept": N, "dropped": N}
{"type": "question_complete", ...}      # from Stage E orchestrator
{"type": "stage_complete", "stage": "deepen", "patterns": N, "follow_up_targets": N}
{"type": "convergence", "reason": "max_iterations_reached"}
{"type": "stage_complete", "stage": "report", "json_path": ..., "markdown_path": ...}

Frontend SSE channel attaches a sink that pushes events to the auditor's run dashboard. CLI runs print to stderr.

Sink failures are caught and logged but never disrupt run execution.

FindingsStore

Mirrors the EngagementStore / PilotStore patterns:

class FindingsStore:
    def list(engagement_id, findings_key) -> list[Finding]
    def get(engagement_id, findings_key, finding_id) -> Finding | None
    def add_batch(engagement_id, findings_key, findings) -> None
    def replace_all(engagement_id, findings_key, findings) -> None
    def update_status(engagement_id, findings_key, finding_id, status,
                      *, auditor_notes="") -> Finding
    def counts(engagement_id, findings_key) -> dict
    def invalidate(engagement_id=None) -> None
  • One JSON object per engagement at engagement.findings_key (= auditforge/engagements/{engagement_id}/findings.json)
  • Lazy load: in-memory cache → local file → S3 → empty
  • Save: local file first, then best-effort S3 upload
  • counts() aggregates by status + severity for the engagement's summary record

Public API

async def run_audit(
    engagement: AuditEngagement,
    intake: IntakeData,
    *,
    client_id: str,
    llm: LLMClient,
    retriever: Retriever,
    engagement_store: EngagementStore,
    findings_store: FindingsStore,
    progress: ProgressSink | None = None,
    embedder: Embedder | None = None,
    max_iterations: int = 3,
    convergence_min_new_findings: int = 3,
    convergence_budget_pct: float = 0.85,
    output_dir: str | None = None,
) -> AuditResult

output_dir flows through to Stage G; if provided, deliverable artifacts write to {output_dir}/{engagement.id}.{json,md}.

Test coverage

Area Cases
_should_continue (pure) 5 (max_iter, low-findings post-first, low-findings ignored on first, budget threshold, aborted)
Full pipeline end-to-end 1 (profile → catalog → synth → validate → investigate → deepen → report; all stages fire; engagement transitions to FINDINGS_REVIEW)
No findings → still completes 1 (deliverable still generated, state still advances)
Intake persistence 1 (intake survives in EngagementStore after run)
Max iterations respected 1 (loop bounded)
Profile failure 1 (FAILED state, AuditResult.failed_at_stage populated)
FindingsStore CRUD 11 (round-trip, counts, persistence, invalidation, evidence/remediation roundtrip)

All 21 cases passing. Full suite: 402 pass, no regressions.

Cost shape

A complete audit invokes the runner once. End-to-end cost:

Stage Per-iteration cost Iterations Subtotal
A: Profile ~$0.03 1 $0.03
B: Catalog ~$0.19 3 $0.57
C: Synthesize $0 3 $0
D: Validate $0 3 $0
E: Investigate ~$1.90 3 $5.76
F: Deepen ~$0.25 3 $0.75
G: Report ~$0.15 1 $0.15
Total ~$7.30

At neutral tier mix, on an 80-question/round engagement.

Known limits / future work

  • No resume support yet. A crashed run loses progress. Engagement state transitions persist, but in-progress iteration state doesn't. Resume is a Phase 2 feature once we know the actual failure modes.
  • No SSE plumbing yet. Runner emits events to a ProgressSink callback; main.py needs an SSE endpoint that converts these to server-sent events. Lands when the REST surface is built.
  • Convergence is conservative. Default thresholds favor running more rounds. Real-corpus dogfooding will inform tuning.
  • Stage G failure leaves no deliverable. Could fall back to a minimal text-only summary. v2 hardening.
  • No per-iteration cost cap. A runaway iteration could consume most of the engagement budget before convergence kicks in. The Stage E budget governor catches it eventually but a per-iteration soft cap would be safer.