Skip to content

Execution Model

Overview

flow8’s execution model is built around a directed acyclic graph (DAG) of steps, each with independent retry logic, filters, and timeouts. A single flow execution is called a play, and each step execution is tracked as a play layer.

Core Concepts

Flow Definition

A flow (DBFlow) is a template stored in MongoDB:

type DBFlow struct {
ID primitive.ObjectID
Name string
CompanyID primitive.ObjectID
Flowlets []*DBFlowlet // Ordered list of steps
Metadata map[string]interface{}
CreatedAt time.Time
UpdatedAt time.Time
}

Flowlet (Step)

Each step in a flow is a flowlet (DBFlowlet), which references a module and defines execution behavior:

type DBFlowlet struct {
ID primitive.ObjectID
FlowID primitive.ObjectID
Name string
ModuleRef string // e.g., "text-extraction", "http-request"
Order int // Execution order
ComponentConfigIds map[string]string // Component overrides (ai, storage, db, etc.)
InputMapping map[string]interface{} // Map input data to module params
Filters *FilterGroup // Pre-execution conditions
Timeout time.Duration // Max execution time (default: 5 minutes)
Retry RetryPolicy // Retry strategy
OnError string // "continue", "fail", "skip"
DeferAfterMs int // Delay execution after deps complete
SkipOutputMapping bool // Don't map output to KV store
}
type FilterGroup struct {
Operator string // "AND" or "OR"
Filters []*Filter
}
type Filter struct {
Expression string // CEL or template expression
Value interface{}
}
type RetryPolicy struct {
MaxRetries int // Max attempts
InitialDelayMs int // First retry delay
MaxDelayMs int // Max backoff
BackoffMultiplier float32 // Exponential backoff factor
JitterFraction float32 // Random variation (0-1)
}

Play (Execution Instance)

A play is a single execution of a flow with full state tracking:

type DBPlay struct {
ID primitive.ObjectID
FlowID primitive.ObjectID
CompanyID primitive.ObjectID
Status string // NEW, RUN, DONE, FAIL, SKIP
StartedAt time.Time
CompletedAt time.Time
PlayLayers []*DBPlayLayer // Step execution records
GlobalKV map[string]interface{} // Global key-value state
FlowGroupID primitive.ObjectID // For grouping multiple plays
IdempotencyRoot string // For deduplication
WebhookURL string // Send results here
}

Play Layer (Step Execution)

Each step’s execution is recorded as a play layer:

type DBPlayLayer struct {
ID primitive.ObjectID
PlayID primitive.ObjectID
FlowletID primitive.ObjectID
FlowletName string
ModuleRef string
Status string // (see state machine below)
Input map[string]interface{}
Output map[string]interface{}
Error string
ExecutedAt time.Time
Duration time.Duration
RetryCount int
Logs []string
KVUpdates map[string]interface{}
}

State Machine

Each play and play layer progresses through states:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ NEW β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
β”‚ FILTERED β”‚ (pre-execution filter failed)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ DEFERRED β”‚ (waiting for delay or dependency)
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”
β”‚ RUN β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”
β”‚ β”‚
β”Œβ”€β”€β–Όβ”€β”€β” β”Œβ”€β”€β”€β–Όβ”€β”€β”
β”‚DONE β”‚ β”‚ FAIL β”‚
β””β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”˜
States triggered by conditions:
- ON_HOLD: Execution paused (manual resume required)
- WAIT: Waiting for external signal (webhook, timer)
- SKIP: Skipped due to error handling rule (OnError: "skip")

Valid state transitions:

FromToCondition
NEWFILTEREDInput filters returned false
NEWDEFERREDDeferAfterMs > 0 or awaiting dependency
FILTEREDDEFERREDAfter defer period
DEFERREDRUNAll dependencies met and delay elapsed
RUNDONEModule completed successfully
RUNFAILModule execution error and retries exhausted
FAILRUNRetry policy triggered (backoff applied)
RUNSKIPOnError: "skip" and error occurred
RUNON_HOLDManual pause (admin action)
ON_HOLDRUNManual resume
RUNWAITAwaiting external input
WAITRUNSignal received or timeout

DAG Execution

Flowlets are executed in topological order with explicit dependency tracking:

type DBFlowlet struct {
DependsOn []primitive.ObjectID // Flowlet IDs that must complete first
}

Execution algorithm:

  1. Identify flowlets with no unmet dependencies
  2. Execute them in parallel (bounded by channel pool size: 7701-7799 per instance)
  3. On completion, mark as done and notify dependents
  4. Repeat until all flowlets are DONE, SKIP, or FAIL
  5. If any flowlet is FAIL and OnError: "continue", continue; else mark play as FAIL

Example DAG:

Flowlet A (HTTPRequest)
β”‚
β”œβ”€β†’ Flowlet B (TextExtraction) ──┐
β”‚ β”œβ”€β†’ Flowlet D (ChatCompletion)
└─→ Flowlet C (ImageAnalysis) ────
└─→ Flowlet E (EmailNotify)
  • A executes first (no deps)
  • B and C execute in parallel after A completes
  • D and E execute in parallel after B and C complete

Filter Evaluation

Before a flowlet executes, its filters are evaluated. Filters are CEL (Common Expression Language) expressions:

{
"filters": {
"operator": "AND",
"filters": [
{
"expression": "len(input.emails) > 0"
},
{
"expression": "timestamp.now().hour >= 9 && timestamp.now().hour < 17"
},
{
"expression": "kv['approval_status'] == 'approved'"
}
]
}
}

If filters return false, the flowlet is marked FILTERED and skipped.

Timeout & Retry

Timeout

Each flowlet has a configurable timeout (default: 5 minutes). If execution exceeds the timeout, the module is interrupted and marked FAIL. The retry policy then determines if a retry is attempted:

if time.Since(startTime) > flowlet.Timeout {
playLayer.Status = "FAIL"
playLayer.Error = "execution timeout exceeded"
// Evaluate retry policy
}

Exponential Backoff Retry

Retry uses exponential backoff with jitter:

retry_delay = min(
initial_delay * (backoff_multiplier ^ retry_count),
max_delay
) + random_jitter

Example configuration:

{
"retry": {
"max_retries": 3,
"initial_delay_ms": 1000,
"max_delay_ms": 30000,
"backoff_multiplier": 2.0,
"jitter_fraction": 0.1
}
}

Delays:

  • Retry 1: 1000 + jitter β†’ ~1100 ms
  • Retry 2: 2000 + jitter β†’ ~2200 ms
  • Retry 3: 4000 + jitter β†’ ~4400 ms
  • Retry 4 (if max > 3): 8000, capped at 30000 + jitter

This prevents thundering herd during API outages.

Key-Value Store Scoping

Each play has a shared key-value store that modules can read and write. The KV store is scoped to control visibility and prevent conflicts:

Scope Levels

ScopeVisibilityLifetimeUse Case
globalAll flowlets in the playEntire play executionShared state (request ID, session data)
flowAll flowlets in this specific flowEntire play executionFlow-specific configuration
flow_groupAll plays in the same flow groupEntire group lifetimeCross-play state (aggregation)

Inheritance & Conflict Resolution

When a flowlet reads a key, the system searches in order:

  1. Flowlet-local KV (if step has local state)
  2. Flow-scoped KV
  3. Global KV

When writing, flowlets can specify scope:

// Write to different scopes
kv["local_result"] = data // Defaults to flow scope
kv["global:request_id"] = uuid // Explicit global scope
kv["flow_group:total_processed"] = count // Flow group scope

Conflict strategy: If the same key is written by multiple concurrent flowlets, the last write wins within a scope. For critical state, use locks or atomic operations via database transactions.

Example KV Store Usage

Play processing a batch of documents:

Global scope:
- request_id: "req-123"
- batch_size: 100
Flow scope (per batch-processing flow):
- processed_count: 50
- errors: []
Flow group scope (aggregation across parallel batch flows):
- total_processed: 150
- total_cost: "$2.50"

Idempotency

Plays can include an IdempotencyRoot to prevent duplicate execution:

type DBPlay struct {
IdempotencyRoot string // e.g., "user-action-12345"
}

If a play with the same idempotency root is submitted within a configurable window (default: 24 hours), the system returns the cached result instead of re-executing. This prevents:

  • Duplicate webhook deliveries
  • Double-charging for API calls
  • Duplicate database inserts

Implementation checks the idempotency collection before creating a new play.

Channel-Based Async Execution

For long-running or resource-intensive flowlets, execution can be deferred to a channel:

type DBFlowlet struct {
DeferAfterMs int // Delay before execution (can queue to channel port)
}

flow8 allocates up to 100 concurrent channel workers per instance (ports 7701-7799). Long-running flowlets are queued and executed asynchronously:

  1. Main flow thread creates channel work item
  2. Channel worker picks it up and executes the module
  3. On completion, result is written back to play
  4. Main flow continues after dependency is satisfied

This allows the HTTP request thread to return quickly while background work continues.

Module Output Mapping

By default, module output is automatically mapped to the KV store:

// Module returns
Response{
State: "DONE",
Output: map[string]interface{}{
"extracted_text": "Hello, world",
"confidence": 0.95,
},
}
// Automatically mapped to KV store
kv["extracted_text"] = "Hello, world"
kv["confidence"] = 0.95

If SkipOutputMapping: true, output is only available within the play layer record and not propagated to KV.

Execution Flow Example

Flow: β€œProcessInvoice”

Flowlet A: HTTPRequest (download PDF)
β†’ Output: { file_path: "/tmp/invoice.pdf" }
Flowlet B: DocumentProcessing (OCR)
β†’ Input: { file: kv["file_path"] }
β†’ Output: { text: "Invoice #123...", confidence: 0.98 }
Flowlet C: TextExtraction (extract fields)
β†’ Input: { text: kv["text"] }
β†’ Filters: kv["confidence"] > 0.9
β†’ Output: { vendor: "ACME Inc", amount: "$500.00" }
Flowlet D: DatabaseInsert (save to DB)
β†’ Input: { vendor: kv["vendor"], amount: kv["amount"] }
β†’ Output: { row_id: 12345 }

Execution trace:

  1. Play created in NEW state
  2. Flowlet A executes (no deps) β†’ DONE, sets kv[β€œfile_path”]
  3. Flowlets B executes (depends on A) β†’ DONE, sets kv[β€œtext”], kv[β€œconfidence”]
  4. Flowlet C evaluates filters (checks kv[β€œconfidence”] > 0.9) β†’ true, executes β†’ DONE, sets kv[β€œvendor”], kv[β€œamount”]
  5. Flowlet D executes (depends on C) β†’ DONE, sets kv[β€œrow_id”]
  6. All flowlets complete, play marked DONE
  7. Result webhook sent (if configured)