Execution Model
Overview
flow8βs execution model is built around a directed acyclic graph (DAG) of steps, each with independent retry logic, filters, and timeouts. A single flow execution is called a play, and each step execution is tracked as a play layer.
Core Concepts
Flow Definition
A flow (DBFlow) is a template stored in MongoDB:
type DBFlow struct { ID primitive.ObjectID Name string CompanyID primitive.ObjectID Flowlets []*DBFlowlet // Ordered list of steps Metadata map[string]interface{} CreatedAt time.Time UpdatedAt time.Time}Flowlet (Step)
Each step in a flow is a flowlet (DBFlowlet), which references a module and defines execution behavior:
type DBFlowlet struct { ID primitive.ObjectID FlowID primitive.ObjectID Name string ModuleRef string // e.g., "text-extraction", "http-request" Order int // Execution order ComponentConfigIds map[string]string // Component overrides (ai, storage, db, etc.) InputMapping map[string]interface{} // Map input data to module params Filters *FilterGroup // Pre-execution conditions Timeout time.Duration // Max execution time (default: 5 minutes) Retry RetryPolicy // Retry strategy OnError string // "continue", "fail", "skip" DeferAfterMs int // Delay execution after deps complete SkipOutputMapping bool // Don't map output to KV store}
type FilterGroup struct { Operator string // "AND" or "OR" Filters []*Filter}
type Filter struct { Expression string // CEL or template expression Value interface{}}
type RetryPolicy struct { MaxRetries int // Max attempts InitialDelayMs int // First retry delay MaxDelayMs int // Max backoff BackoffMultiplier float32 // Exponential backoff factor JitterFraction float32 // Random variation (0-1)}Play (Execution Instance)
A play is a single execution of a flow with full state tracking:
type DBPlay struct { ID primitive.ObjectID FlowID primitive.ObjectID CompanyID primitive.ObjectID Status string // NEW, RUN, DONE, FAIL, SKIP StartedAt time.Time CompletedAt time.Time PlayLayers []*DBPlayLayer // Step execution records GlobalKV map[string]interface{} // Global key-value state FlowGroupID primitive.ObjectID // For grouping multiple plays IdempotencyRoot string // For deduplication WebhookURL string // Send results here}Play Layer (Step Execution)
Each stepβs execution is recorded as a play layer:
type DBPlayLayer struct { ID primitive.ObjectID PlayID primitive.ObjectID FlowletID primitive.ObjectID FlowletName string ModuleRef string Status string // (see state machine below) Input map[string]interface{} Output map[string]interface{} Error string ExecutedAt time.Time Duration time.Duration RetryCount int Logs []string KVUpdates map[string]interface{}}State Machine
Each play and play layer progresses through states:
βββββββββββ β NEW β ββββββ¬βββββ β ββββββΌβββββββ β FILTERED β (pre-execution filter failed) ββββββββββββββ
ββββββββββββββ β DEFERRED β (waiting for delay or dependency) ββββββ¬ββββββββ β ββββββΌβββββ β RUN β ββββββ¬βββββ β ββββββββ΄ββββββββ β β ββββΌβββ βββββΌβββ βDONE β β FAIL β ββββββββ ββββββββ
States triggered by conditions: - ON_HOLD: Execution paused (manual resume required) - WAIT: Waiting for external signal (webhook, timer) - SKIP: Skipped due to error handling rule (OnError: "skip")Valid state transitions:
| From | To | Condition |
|---|---|---|
| NEW | FILTERED | Input filters returned false |
| NEW | DEFERRED | DeferAfterMs > 0 or awaiting dependency |
| FILTERED | DEFERRED | After defer period |
| DEFERRED | RUN | All dependencies met and delay elapsed |
| RUN | DONE | Module completed successfully |
| RUN | FAIL | Module execution error and retries exhausted |
| FAIL | RUN | Retry policy triggered (backoff applied) |
| RUN | SKIP | OnError: "skip" and error occurred |
| RUN | ON_HOLD | Manual pause (admin action) |
| ON_HOLD | RUN | Manual resume |
| RUN | WAIT | Awaiting external input |
| WAIT | RUN | Signal received or timeout |
DAG Execution
Flowlets are executed in topological order with explicit dependency tracking:
type DBFlowlet struct { DependsOn []primitive.ObjectID // Flowlet IDs that must complete first}Execution algorithm:
- Identify flowlets with no unmet dependencies
- Execute them in parallel (bounded by channel pool size: 7701-7799 per instance)
- On completion, mark as done and notify dependents
- Repeat until all flowlets are DONE, SKIP, or FAIL
- If any flowlet is FAIL and
OnError: "continue", continue; else mark play as FAIL
Example DAG:
Flowlet A (HTTPRequest) β βββ Flowlet B (TextExtraction) βββ β βββ Flowlet D (ChatCompletion) βββ Flowlet C (ImageAnalysis) ββββ€ βββ Flowlet E (EmailNotify)- A executes first (no deps)
- B and C execute in parallel after A completes
- D and E execute in parallel after B and C complete
Filter Evaluation
Before a flowlet executes, its filters are evaluated. Filters are CEL (Common Expression Language) expressions:
{ "filters": { "operator": "AND", "filters": [ { "expression": "len(input.emails) > 0" }, { "expression": "timestamp.now().hour >= 9 && timestamp.now().hour < 17" }, { "expression": "kv['approval_status'] == 'approved'" } ] }}If filters return false, the flowlet is marked FILTERED and skipped.
Timeout & Retry
Timeout
Each flowlet has a configurable timeout (default: 5 minutes). If execution exceeds the timeout, the module is interrupted and marked FAIL. The retry policy then determines if a retry is attempted:
if time.Since(startTime) > flowlet.Timeout { playLayer.Status = "FAIL" playLayer.Error = "execution timeout exceeded" // Evaluate retry policy}Exponential Backoff Retry
Retry uses exponential backoff with jitter:
retry_delay = min( initial_delay * (backoff_multiplier ^ retry_count), max_delay) + random_jitterExample configuration:
{ "retry": { "max_retries": 3, "initial_delay_ms": 1000, "max_delay_ms": 30000, "backoff_multiplier": 2.0, "jitter_fraction": 0.1 }}Delays:
- Retry 1: 1000 + jitter β ~1100 ms
- Retry 2: 2000 + jitter β ~2200 ms
- Retry 3: 4000 + jitter β ~4400 ms
- Retry 4 (if max > 3): 8000, capped at 30000 + jitter
This prevents thundering herd during API outages.
Key-Value Store Scoping
Each play has a shared key-value store that modules can read and write. The KV store is scoped to control visibility and prevent conflicts:
Scope Levels
| Scope | Visibility | Lifetime | Use Case |
|---|---|---|---|
global | All flowlets in the play | Entire play execution | Shared state (request ID, session data) |
flow | All flowlets in this specific flow | Entire play execution | Flow-specific configuration |
flow_group | All plays in the same flow group | Entire group lifetime | Cross-play state (aggregation) |
Inheritance & Conflict Resolution
When a flowlet reads a key, the system searches in order:
- Flowlet-local KV (if step has local state)
- Flow-scoped KV
- Global KV
When writing, flowlets can specify scope:
// Write to different scopeskv["local_result"] = data // Defaults to flow scopekv["global:request_id"] = uuid // Explicit global scopekv["flow_group:total_processed"] = count // Flow group scopeConflict strategy: If the same key is written by multiple concurrent flowlets, the last write wins within a scope. For critical state, use locks or atomic operations via database transactions.
Example KV Store Usage
Play processing a batch of documents:
Global scope: - request_id: "req-123" - batch_size: 100
Flow scope (per batch-processing flow): - processed_count: 50 - errors: []
Flow group scope (aggregation across parallel batch flows): - total_processed: 150 - total_cost: "$2.50"Idempotency
Plays can include an IdempotencyRoot to prevent duplicate execution:
type DBPlay struct { IdempotencyRoot string // e.g., "user-action-12345"}If a play with the same idempotency root is submitted within a configurable window (default: 24 hours), the system returns the cached result instead of re-executing. This prevents:
- Duplicate webhook deliveries
- Double-charging for API calls
- Duplicate database inserts
Implementation checks the idempotency collection before creating a new play.
Channel-Based Async Execution
For long-running or resource-intensive flowlets, execution can be deferred to a channel:
type DBFlowlet struct { DeferAfterMs int // Delay before execution (can queue to channel port)}flow8 allocates up to 100 concurrent channel workers per instance (ports 7701-7799). Long-running flowlets are queued and executed asynchronously:
- Main flow thread creates channel work item
- Channel worker picks it up and executes the module
- On completion, result is written back to play
- Main flow continues after dependency is satisfied
This allows the HTTP request thread to return quickly while background work continues.
Module Output Mapping
By default, module output is automatically mapped to the KV store:
// Module returnsResponse{ State: "DONE", Output: map[string]interface{}{ "extracted_text": "Hello, world", "confidence": 0.95, },}
// Automatically mapped to KV storekv["extracted_text"] = "Hello, world"kv["confidence"] = 0.95If SkipOutputMapping: true, output is only available within the play layer record and not propagated to KV.
Execution Flow Example
Flow: βProcessInvoiceβ
Flowlet A: HTTPRequest (download PDF) β Output: { file_path: "/tmp/invoice.pdf" }
Flowlet B: DocumentProcessing (OCR) β Input: { file: kv["file_path"] } β Output: { text: "Invoice #123...", confidence: 0.98 }
Flowlet C: TextExtraction (extract fields) β Input: { text: kv["text"] } β Filters: kv["confidence"] > 0.9 β Output: { vendor: "ACME Inc", amount: "$500.00" }
Flowlet D: DatabaseInsert (save to DB) β Input: { vendor: kv["vendor"], amount: kv["amount"] } β Output: { row_id: 12345 }Execution trace:
- Play created in NEW state
- Flowlet A executes (no deps) β DONE, sets kv[βfile_pathβ]
- Flowlets B executes (depends on A) β DONE, sets kv[βtextβ], kv[βconfidenceβ]
- Flowlet C evaluates filters (checks kv[βconfidenceβ] > 0.9) β true, executes β DONE, sets kv[βvendorβ], kv[βamountβ]
- Flowlet D executes (depends on C) β DONE, sets kv[βrow_idβ]
- All flowlets complete, play marked DONE
- Result webhook sent (if configured)