Execution & Runs

How AgentFlow executes pipelines, streams logs, handles failures, and manages run history.

Execution Flow

1. Validation

Sub-pipeline references checked, MCP tools validated, instruction hashes computed for each node.

2. DAG Construction

Nodes and edges are analyzed to build a topological execution order. Nodes at the same level can run concurrently.

3. Level-by-Level Execution

For each level in the DAG, runnable nodes are identified based on edge conditions from predecessors.

4. Process Spawning

AI tasks spawn Claude CLI, shell/git nodes spawn bash. All stdout/stderr is streamed line-by-line to the UI.

5. Status Updates

Each node transitions through: pending → running → success/failed/skipped/cancelled. Events emitted on every change.

6. Cost Aggregation

AI task costs are parsed from Claude CLI stderr and summed for the entire run.

7. Database Persistence

Run metadata and per-node details (logs, exit codes, costs, timing) are stored in SQLite.

Data Flow

End-to-end execution flow
text
Frontend: startRun(pipeline, inputs, cliPath, projectPath)
    │
    ▼
Tauri IPC → executor::start_run()
    │
    ├── Validate sub-pipelines & MCP tools
    ├── Insert run record in SQLite
    ├── Spawn async task: run_pipeline_loop()
    │       │
    │       ├── Build topological order (DAG)
    │       ├── For each level:
    │       │     ├── Check edge conditions
    │       │     ├── Skip nodes without satisfied edges
    │       │     └── Execute runnable nodes concurrently
    │       │           │
    │       │           ├── ai-task → spawn claude CLI
    │       │           ├── shell/git → spawn bash -c
    │       │           ├── parallel → tokio::spawn children
    │       │           ├── loop → iterate items, spawn per-item
    │       │           ├── approval-gate → emit event, wait
    │       │           └── sub-pipeline → recursive call
    │       │
    │       ├── Emit node-log events (streamed output)
    │       ├── Emit run-update events (status changes)
    │       ├── Parse cost from Claude stderr
    │       └── Insert run_steps in SQLite
    │
    ▼
Frontend: listens for events, updates canvas & log viewer

Loop Execution

Loop nodes iterate over a list of items, executing their child nodes once per item. Each iteration injects per-item variables.

Loop execution flow
text
1. Parse the instructions field into a list using the configured separator
   (newline, comma, or custom string)
2. Cap the list at max_iterations (default: 1000)
3. For each item in the list:
   a. Inject loop variables: $LOOP_ITEM, $LOOP_INDEX, $LOOP_COUNT
   b. Execute all child nodes with the injected variables
   c. Track per-iteration status and cost
4. Loop succeeds only if ALL iterations succeed
5. If any iteration fails, remaining iterations are skipped
6. Costs from all iterations are aggregated into the loop node's total

Performance

Loop nodes use Arc-wrapped read-only data for child spawns, eliminating deep-clone overhead. Results are processed in completion order via Tokio JoinSet for maximum throughput.

Live Log Streaming

During execution, stdout and stderr from each node are captured and buffered, then flushed as batched events every 50ms to minimize IPC overhead.

Event format
json
// node-log-batch event payload (batched every 50ms)
{
  "runId": "run-1708344567890",
  "nodeId": "node-2",
  "lines": [
    "PASS src/utils.test.ts (2.34s)",
    "PASS src/lib.test.ts (1.12s)"
  ]
}

// run-update event payload
{
  "runId": "run-1708344567890",
  "status": "running",
  "nodeStates": {
    "node-1": "success",
    "node-2": "running",
    "node-3": "pending"
  },
  "totalCost": 0.0342
}

The live log viewer shows streamed output in a monospace panel. A "Latest" button lets you jump directly to the most recent node with results — expanding and highlighting it automatically. Only the last 200 log lines per node are rendered, with an indicator for hidden earlier lines.

Cancellation

Running pipelines can be cancelled at any time via Ctrl+R (toggles run/cancel) or the cancel button.

  • Cancellation is propagated via a tokio::sync::watch channel for zero-cost event-driven signaling
  • Active processes receive SIGTERM first
  • If a process doesn't exit within the grace period, SIGKILL is sent
  • All pending nodes are marked as cancelled
  • The run is recorded with status cancelled in the database

Resume from Failure

When a pipeline fails, you can resume it from the point of failure instead of re-running everything from scratch.

Resume logic
text
1. Load original run and its steps from the database
2. Build maps:
   - prior_results: { nodeId → status }
   - prior_approvals: { nodeId → "approved" | "rejected" }
   - prior_instructions: { nodeId → SHA256 hash }
3. During re-execution, for each node:
   - If prior status was "success" AND instruction hash matches
     → Reuse result (skip execution)
   - If prior approval was "approved"
     → Auto-approve (skip waiting)
   - Otherwise → Execute normally
4. New run ID created, linked via resumed_from field

Instruction change detection

Resume uses SHA-256 hashes of node instructions. If you modify a node's instructions between the original run and the resume, that node will be re-executed even if it previously succeeded.

Run History

Every pipeline run is stored in the SQLite database with full details.

Run Record

FieldDescription
idUnique run ID, format: "run-{timestamp_ms}"
pipeline_nameName of the executed pipeline
started_atISO 8601 UTC timestamp
finished_atNULL until completion
status"running" | "success" | "failed" | "cancelled"
trigger_inputJSON string of input variables used
resumed_fromOriginal run ID if this is a resume
failed_node_idID of the first node that failed
pipeline_hashSHA-256 of all instructions

Step Record

FieldDescription
run_idReferences the parent run
node_idPipeline node ID
node_nameDisplay name at execution time
started_at / finished_atExecution timing
statusNode execution status
exit_codeProcess exit code (0 = success)
log_outputFull captured stdout + stderr
attemptRetry attempt number (starts at 1)
cost_usdClaude API cost (NULL for non-AI nodes)
modelAI model name if tracked
approval_state"approved" | "rejected" for approval gates
instructions_hashSHA-256 for resume detection

Filtering

The run history panel supports filtering by pipeline name, status (success, failed, cancelled), and date range. Filters persist across sessions so you can quickly return to the runs you care about.

Approval Handling

When execution reaches an approval gate:

  1. An approval-requested event is emitted
  2. The UI shows a dialog with the node's instructions
  3. The user clicks Approve or Reject
  4. respond_to_approval(approved: bool) is called via IPC
  5. Execution continues (on approve) or fails (on reject)

Desktop Notifications

AgentFlow sends native OS notifications for key pipeline events, even when the app is minimized or in the background.

  • Run completed: Notifies when a pipeline finishes successfully
  • Run failed: Alerts immediately when a node fails
  • Approval requested: Prompts you to review and approve/reject a gate

Tip

Notifications use the native OS notification system (Windows Toast, macOS Notification Center, Linux libnotify). You can control permissions through your OS settings.

Run State Isolation

Run results are scoped to their pipeline. Switching between pipelines preserves each pipeline's run state independently — the canvas and LiveLog only show results matching the currently selected pipeline.

Expandable Node Output

During and after execution, each node on the canvas can be expanded to show its full output inline. Click a completed node to toggle its output panel, which displays the captured stdout/stderr with syntax highlighting.

Export Run Reports

Completed runs can be exported as JSON reports containing full run metadata, per-node results, logs, timing, and cost breakdowns. Use these for auditing, sharing with teammates, or feeding into external dashboards.