Runner + FindingsStore¶
Status: ✅ Complete
Files: app/auditforge/runner.py, app/auditforge/findings.py (FindingsStore section)
Tests: tests/test_auditforge_runner.py (10 cases), tests/test_auditforge_findings_store.py (11 cases) — all passing
Purpose¶
The runner is the bridge between the per-stage modules and the rest of the world (CLI, REST endpoints, scheduled jobs). It:
- Drives the full A→B→C→D→E→F→G pipeline
- Manages engagement state transitions
- Iterates Stage B-F until convergence (max iterations, low new findings, or budget cap)
- Persists findings to FindingsStore at iteration boundaries
- Snapshots cost telemetry into the engagement record
- Streams aggregated progress events
- Handles per-stage failures gracefully — upstream failures mark the engagement FAILED; Stage F or G failures degrade to partial output
The FindingsStore persists findings to S3 with the same lazy-load +
best-effort upload pattern as EngagementStore and PilotStore. One JSON
object per engagement at engagement.findings_key.
Pipeline flow¶
engagement (CREATED) + intake
│
▼
engagement_store.update_intake
│
▼
[transition: PROFILING]
profile_corpus (Stage A)
│
▼
for iteration in range(max_iterations):
[transition: CATALOGING (first iter)]
build_catalog (Stage B, prior_findings + iteration informs prompts)
│
▼
synthesize_questions (Stage C)
│
▼
validate_questions (Stage D, retrieval-stash for Stage E reuse)
│
▼
[transition: INVESTIGATING (first iter)]
run_investigation (Stage E)
│
▼
findings_store.replace_all ◄── persistence boundary
engagement_store.update_finding_counts
│
▼
_should_continue?
├── max_iterations reached? STOP
├── budget_utilization >= threshold? STOP
├── new findings this round < min? STOP (after first iter)
├── aborted_due_to_budget? STOP
└── otherwise: CONTINUE
│
▼ (continue)
[transition: DEEPENING]
cluster_and_deepen (Stage F)
│
▼
findings_store.replace_all ◄── related_finding_ids set in-place
│
└─► next iteration
Final cluster_and_deepen if not done in last iter
│
▼
[transition: REPORTING]
generate_deliverable (Stage G)
│
▼
[transition: FINDINGS_REVIEW]
│
▼
AuditResult
Convergence (_should_continue)¶
Pure function — easy to test. Returns (continue?, reason_if_stop).
Order of checks (each is a stop condition):
aborted_due_to_budgetfrom Stage E run → "budget_exhausted"iteration + 1 >= max_iterations→ "max_iterations_reached"budget_utilization >= convergence_budget_pct(default 0.85) → "budget_threshold_X_reached"- After first iter only:
findings_new_this_round < convergence_min_new_findings(default 3) → "low_new_findings_N"
Default behavior: 3 iteration rounds max, soft-stop at 85% budget, soft-stop after iteration 1 if a round produced < 3 new findings. All parameters tunable.
Engagement state machine¶
The runner makes the engagement progress through these states:
CREATED ─► (intake persisted)
│
▼
PROFILING ─► (profile generated)
│
▼
CATALOGING ─► (first iteration's catalog ready)
│
▼
INVESTIGATING ─► (first iteration's findings)
│
▼
DEEPENING ⇄ INVESTIGATING (loop until convergence)
│
▼
REPORTING ─► (deliverable generated)
│
▼
FINDINGS_REVIEW ◄─── auditor takes over here
FAILED from any stage's unrecoverable error. update_status and
transition calls are wrapped in try/except so the runner never aborts
on persistence failure (logged but continues).
Failure isolation¶
| Stage failure | Runner behavior |
|---|---|
| Stage A (profile) | FAILED state, abort run, return AuditResult with failed_at_stage="profile" |
| Stage B/C/D within iteration | FAILED state, abort run, persist what completed |
| Stage E (per-question) | Already isolated by orchestrator; doesn't bubble up |
| Stage F (deepen) | Logged, non-fatal — iteration continues without follow-up signals |
| Stage G (report) | Logged, non-fatal — engagement still transitions to FINDINGS_REVIEW with deliverable=None |
The principle: partial output is more valuable than no output. A run that produced 40 findings but failed to generate a polished deliverable is still useful to the auditor.
AuditResult shape¶
@dataclass
class AuditResult:
engagement: AuditEngagement
profile: CorpusProfile | None
findings: list[Finding]
iterations: list[IterationRecord] # one per round
deepen_results: list[DeepenResult]
deliverable: Deliverable | None
cost_cents: float
aborted_due_to_budget: bool
converged: bool # True if stopped via max_iter or low-findings
failed_at_stage: str | None
failure_reason: str | None
IterationRecord per round captures questions_synthesized,
questions_validated, questions_dropped, questions_run, findings_new,
cost_cents, aborted_due_to_budget. Used for run-dashboard reporting
and post-engagement analysis.
Progress event stream¶
Progress events emitted via ProgressSink callback (already used by
Stage E):
{"type": "stage_start", "stage": "profile"}
{"type": "stage_complete", "stage": "profile", "total_chunks": ..., ...}
{"type": "stage_start", "stage": "catalog", "iteration": 0}
{"type": "stage_complete", "stage": "catalog", "iteration": 0,
"concepts": N, "doc_pairs": N, ...}
{"type": "stage_complete", "stage": "synthesize", ...}
{"type": "stage_complete", "stage": "validate", "kept": N, "dropped": N}
{"type": "question_complete", ...} # from Stage E orchestrator
{"type": "stage_complete", "stage": "deepen", "patterns": N, "follow_up_targets": N}
{"type": "convergence", "reason": "max_iterations_reached"}
{"type": "stage_complete", "stage": "report", "json_path": ..., "markdown_path": ...}
Frontend SSE channel attaches a sink that pushes events to the auditor's run dashboard. CLI runs print to stderr.
Sink failures are caught and logged but never disrupt run execution.
FindingsStore¶
Mirrors the EngagementStore / PilotStore patterns:
class FindingsStore:
def list(engagement_id, findings_key) -> list[Finding]
def get(engagement_id, findings_key, finding_id) -> Finding | None
def add_batch(engagement_id, findings_key, findings) -> None
def replace_all(engagement_id, findings_key, findings) -> None
def update_status(engagement_id, findings_key, finding_id, status,
*, auditor_notes="") -> Finding
def counts(engagement_id, findings_key) -> dict
def invalidate(engagement_id=None) -> None
- One JSON object per engagement at
engagement.findings_key(=auditforge/engagements/{engagement_id}/findings.json) - Lazy load: in-memory cache → local file → S3 → empty
- Save: local file first, then best-effort S3 upload
counts()aggregates by status + severity for the engagement's summary record
Public API¶
async def run_audit(
engagement: AuditEngagement,
intake: IntakeData,
*,
client_id: str,
llm: LLMClient,
retriever: Retriever,
engagement_store: EngagementStore,
findings_store: FindingsStore,
progress: ProgressSink | None = None,
embedder: Embedder | None = None,
max_iterations: int = 3,
convergence_min_new_findings: int = 3,
convergence_budget_pct: float = 0.85,
output_dir: str | None = None,
) -> AuditResult
output_dir flows through to Stage G; if provided, deliverable artifacts
write to {output_dir}/{engagement.id}.{json,md}.
Test coverage¶
| Area | Cases |
|---|---|
_should_continue (pure) |
5 (max_iter, low-findings post-first, low-findings ignored on first, budget threshold, aborted) |
| Full pipeline end-to-end | 1 (profile → catalog → synth → validate → investigate → deepen → report; all stages fire; engagement transitions to FINDINGS_REVIEW) |
| No findings → still completes | 1 (deliverable still generated, state still advances) |
| Intake persistence | 1 (intake survives in EngagementStore after run) |
| Max iterations respected | 1 (loop bounded) |
| Profile failure | 1 (FAILED state, AuditResult.failed_at_stage populated) |
FindingsStore CRUD |
11 (round-trip, counts, persistence, invalidation, evidence/remediation roundtrip) |
All 21 cases passing. Full suite: 402 pass, no regressions.
Cost shape¶
A complete audit invokes the runner once. End-to-end cost:
| Stage | Per-iteration cost | Iterations | Subtotal |
|---|---|---|---|
| A: Profile | ~$0.03 | 1 | $0.03 |
| B: Catalog | ~$0.19 | 3 | $0.57 |
| C: Synthesize | $0 | 3 | $0 |
| D: Validate | $0 | 3 | $0 |
| E: Investigate | ~$1.90 | 3 | $5.76 |
| F: Deepen | ~$0.25 | 3 | $0.75 |
| G: Report | ~$0.15 | 1 | $0.15 |
| Total | ~$7.30 |
At neutral tier mix, on an 80-question/round engagement.
Known limits / future work¶
- No resume support yet. A crashed run loses progress. Engagement state transitions persist, but in-progress iteration state doesn't. Resume is a Phase 2 feature once we know the actual failure modes.
- No SSE plumbing yet. Runner emits events to a
ProgressSinkcallback; main.py needs an SSE endpoint that converts these to server-sent events. Lands when the REST surface is built. - Convergence is conservative. Default thresholds favor running more rounds. Real-corpus dogfooding will inform tuning.
- Stage G failure leaves no deliverable. Could fall back to a minimal text-only summary. v2 hardening.
- No per-iteration cost cap. A runaway iteration could consume most of the engagement budget before convergence kicks in. The Stage E budget governor catches it eventually but a per-iteration soft cap would be safer.