Skip to content

Monitoring & Logging

Application Logging

Log Output

flow8 uses zerolog for structured logging. Logs are written to:

/app/data/logs/app.log

Also streamed to stdout (for container log aggregation).

Log Format

2026-04-04T10:23:45.123Z INFO http user_id=user_456 company_id=company_123 GET /api/v1/flows?page=1 200 45ms
2026-04-04T10:23:46.456Z ERROR auth user_id=user_456 reason=invalid_password login_attempt=3 user_ip=203.0.113.42
2026-04-04T10:23:47.789Z WARN mongo duration_ms=234 collection=flows query_type=find

Fields:

  • timestamp — ISO 8601 with nanosecond precision
  • level — DEBUG, INFO, WARN, ERROR, FATAL
  • component — http, auth, mongo, storage, etc.
  • duration_ms — Execution time (for slow queries)
  • error — Error details (for errors)
  • request_id — Trace ID across services
  • Custom fields — App-specific context

Log Level Configuration

Terminal window
# Environment variable
LOG_LEVEL=debug # debug, info, warn, error
# Docker
docker run -e LOG_LEVEL=debug flow8core:latest
# Kubernetes
env:
- name: LOG_LEVEL
value: debug

Log Retention

Logs are rotated by size:

config/config.yml
logging:
file: /app/data/logs/app.log
max_size_mb: 100 # Rotate when exceeds 100MB
max_backups: 10 # Keep 10 old files
max_age_days: 30 # Delete after 30 days
compress: true # Gzip old logs

Audit Logging

Comprehensive audit trail of all significant events (see Audit Logging guide for details).

Audit Log Query

Terminal window
# View recent audit events
curl -X GET http://localhost:4454/api/v1/audit?limit=50 \
-H "Authorization: Bearer $TOKEN"
# Filter by action type
curl -X GET "http://localhost:4454/api/v1/audit?action=flow_created&limit=20" \
-H "Authorization: Bearer $TOKEN"
# Export to CSV
curl -X GET "http://localhost:4454/api/v1/audit/export?format=csv" \
-H "Authorization: Bearer $TOKEN" > audit.csv

WebSocket Real-Time Events

flow8 streams execution events via WebSocket for live monitoring:

Connecting

// JavaScript client
const ws = new WebSocket('ws://localhost:4454/ws/plays/:play_id');
ws.addEventListener('open', () => {
console.log('Connected to play');
});
ws.addEventListener('message', (event) => {
const msg = JSON.parse(event.data);
console.log('Event:', msg.type, msg.data);
});

Event Types

EventPayloadFrequency
layer_start{ layer_id, flowlet_name, timestamp }Per flowlet
layer_progress{ layer_id, progress_percent, status }Periodic
layer_complete{ layer_id, status, duration_ms, output }Per flowlet
layer_error{ layer_id, error_message, retry_count }On error
play_state_change{ play_id, old_status, new_status, timestamp }Per state change
play_complete{ play_id, final_status, duration_ms, kv_summary }On completion

Server-Side Ping

Ping/pong keepalive prevents connection timeouts:

config/config.yml
websocket:
ping_period_seconds: 15
read_deadline_seconds: 30
write_deadline_seconds: 10

Background Job Monitoring

Scheduler (Cron)

Type: Recurring task execution

Monitored metrics:

  • Last run time
  • Next scheduled run
  • Execution duration
  • Success/failure status

Query:

Terminal window
# Check scheduled flows
curl -X GET http://localhost:4454/api/v1/scheduler/jobs \
-H "Authorization: Bearer $TOKEN"
Response:
{
"jobs": [
{
"id": "job_123",
"flow_id": "flow_456",
"schedule": "0 9 * * *",
"last_run": "2026-04-04T09:00:00Z",
"next_run": "2026-04-05T09:00:00Z",
"last_duration_ms": 2345,
"last_status": "DONE"
}
]
}

Metrics for monitoring:

flow8_scheduler_runs_total
- flow_id
- status (success/failure)
flow8_scheduler_run_duration_seconds
- flow_id
- quantile (0.5, 0.95, 0.99)
flow8_scheduler_next_run_seconds_until
- flow_id

Retention Cleanup

Type: Automated cleanup job

Runs: Every 2 minutes (configurable)

Actions:

  • Deletes audit logs older than retention period
  • Deletes archived flows older than retention period
  • Enforces minimum retention (14 days for audit logs)

Monitored:

Terminal window
flow8_retention_cleanup_duration_seconds
- collection
- status
flow8_retention_entries_deleted_total
- collection
- period

Check status:

Terminal window
# View last cleanup runs
curl -X GET http://localhost:4454/api/v1/admin/jobs/retention \
-H "Authorization: Bearer $TOKEN"
Response:
{
"last_run": "2026-04-04T10:15:00Z",
"status": "completed",
"entries_deleted": 1234,
"duration_ms": 3456,
"next_run": "2026-04-04T10:17:00Z"
}

Audit Filter Cache

Type: Background refresh of audit query cache

Runs: Every 3 minutes

Purpose: Pre-compute common audit queries for fast response

Monitored:

flow8_audit_cache_refresh_duration_seconds
flow8_audit_cache_entries

TTL Cache Cleanup

Type: Evict expired entries

Runs: Every 1 minute

Caches:

  • Test case fixtures (TTL: 24 hours)
  • Module definitions (TTL: 1 hour)
  • Flow definitions (TTL: 5 minutes)

Monitored:

flow8_cache_evictions_total
- cache_type
- reason (ttl_expired, capacity)

MongoDB Monitoring

Connection Pool

Metrics:

flow8_mongodb_connections_open
flow8_mongodb_connections_idle
flow8_mongodb_pool_size_current

Health check:

Terminal window
# Test MongoDB connectivity
kubectl exec -it flow8-0 -- mongosh \
--uri="$MONGODB_URI" \
--eval "db.adminCommand('ping')"

Query Performance

Slow query threshold: > 500ms

Enable slow query logging:

# MongoDB server config
systemLog:
level: info
logAppender:
- file
operationProfiling:
mode: slowOp
slowOpThresholdMs: 500

Metrics:

flow8_mongodb_query_duration_seconds
- collection
- operation (find, insert, update, delete)
- quantile (0.5, 0.95, 0.99)
flow8_mongodb_slow_queries_total
- collection

Replication Status

For MongoDB replica sets:

Terminal window
# Check replication
mongosh --eval "rs.status()"
# Monitor lag
mongosh --eval "db.rs.printReplicationInfo()"

Prometheus Metrics

flow8 exposes Prometheus metrics at /metrics:

Terminal window
curl http://localhost:4454/metrics

Common Metrics

# HTTP requests
flow8_http_requests_total{method, path, status}
flow8_http_request_duration_seconds{method, path}
# Authentication
flow8_auth_attempts_total{method, status}
flow8_auth_failures_total{reason}
# Flow execution
flow8_plays_total{status}
flow8_play_duration_seconds{flow_id, quantile}
flow8_layers_executed_total{module, status}
# Database
flow8_mongodb_operations_total{collection, operation}
flow8_mongodb_operation_duration_seconds{collection, operation}
# Background jobs
flow8_scheduler_runs_total{flow_id, status}
flow8_retention_cleanup_duration_seconds{collection}
# Component usage
flow8_ai_tokens_total{provider}
flow8_ai_cost_dollars{provider}
flow8_storage_bytes_written{component}

Datadog Integration

Agent Setup

# datadog-agent deployment
apiVersion: v1
kind: ConfigMap
metadata:
name: datadog-config
data:
prometheus.d.yaml: |
init_config:
instances:
- prometheus_url: http://flow8:4454/metrics
namespace: flow8

Dashboards

Create custom dashboard for flow8:

{
"widgets": [
{
"type": "timeseries",
"title": "HTTP Requests",
"queries": [
{
"name": "Requests/sec",
"query": "avg:flow8.http.requests.total{*}"
}
]
},
{
"type": "timeseries",
"title": "Flow Execution Time",
"queries": [
{
"name": "p95 duration",
"query": "avg:flow8.play.duration.seconds{quantile:0.95}"
}
]
},
{
"type": "table",
"title": "Top Flows by Execution Count",
"queries": [
{
"query": "select count() from flow8.plays_total group by flow_id limit 10"
}
]
}
]
}

Alerting

Alert Rules

prometheus/rules.yml
groups:
- name: flow8
rules:
- alert: HighErrorRate
expr: |
rate(flow8_http_requests_total{status=~"5.."}[5m]) /
rate(flow8_http_requests_total[5m]) > 0.05
for: 5m
annotations:
summary: "High error rate (>5%) detected"
action: "Check application logs"
- alert: FlowExecutionTimeout
expr: flow8_play_duration_seconds > 3600
for: 1m
annotations:
summary: "Flow execution exceeded 1 hour"
- alert: MongoDBConnectionPoolExhausted
expr: flow8_mongodb_connections_open >= 95
for: 5m
annotations:
summary: "MongoDB connection pool near capacity"
action: "Increase pool size or reduce concurrent flows"
- alert: AuditLogRetentionBreach
expr: |
(count(flow8_audit_logs{timestamp > now() - 14d}) < 10) and
(timestamp(now()) > max(audit_logs.updated_at) + 24h)
for: 1h
annotations:
summary: "Audit logs below minimum retention"
- alert: SchedulerJobFailure
expr: |
increase(flow8_scheduler_runs_total{status="failure"}[10m]) > 0
for: 1m
annotations:
summary: "Scheduled flow failed"
action: "Review scheduler logs"

Email Alerting

Configure SMTP for email notifications:

config/config.yml
alert:
channel:
email:
enabled: true
smtp_host: smtp.company.com
smtp_port: 587
smtp_user: alerts@company.com
smtp_password: "[encrypted]"
from_address: alerts@company.com
recipients:
- ops@company.com
- security@company.com
rules:
- event: auth_failure_rate > 10/min
severity: critical
- event: mongodb_connection_pool > 90%
severity: warning

Slack Alerting

alert:
channel:
slack:
enabled: true
webhook_url: "[encrypted]"
channel: "#flow8-alerts"
rules:
- event: play_execution_error
severity: high
notify: true

Health Checks

HTTP Health Endpoint

Terminal window
GET /health
Response:
{
"status": "healthy",
"checks": {
"mongodb": {
"status": "healthy",
"latency_ms": 5
},
"config": {
"status": "healthy",
"loaded_at": "2026-04-04T10:00:00Z"
},
"components": {
"status": "healthy",
"ai": "ready",
"storage": "ready"
}
},
"version": "1.0.0",
"timestamp": "2026-04-04T10:23:45Z"
}

Kubernetes Liveness Probe

livenessProbe:
httpGet:
path: /health
port: 4454
initialDelaySeconds: 30
periodSeconds: 10
failureThreshold: 3

Log Aggregation

ELK Stack (Elasticsearch, Logstash, Kibana)

Logstash configuration:

input {
file {
path => "/app/data/logs/app.log"
codec => "json"
start_position => "beginning"
}
}
filter {
mutate {
add_field => { "[@metadata][index_name]" => "flow8-%{+YYYY.MM.dd}" }
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index_name]}"
}
}

Kibana dashboards:

  • HTTP Request Timeline
  • Error Rate by Component
  • Execution Duration Heatmap
  • Authentication Events
  • Database Query Performance

Splunk

HTTP Event Collector (HEC) setup:

config/config.yml
logging:
splunk:
enabled: true
hec_url: "https://splunk.company.com:8088"
hec_token: "[encrypted]"
source: "flow8"
sourcetype: "_json"
batch_size: 100
flush_interval_seconds: 10

Troubleshooting

High Latency in Responses

Check logs:

Terminal window
tail -f /app/data/logs/app.log | grep duration_ms
# Look for operations > 500ms
2026-04-04T10:23:45Z WARN mongo collection=flows duration_ms=750 query_type=find

Investigate:

  1. Check MongoDB slow logs
  2. Review MongoDB index usage
  3. Check network latency to MongoDB
  4. Look for concurrent high-load operations

High Memory Usage

Check metrics:

Terminal window
curl http://localhost:4454/metrics | grep memory
flow8_process_memory_bytes 524288000

Solutions:

  1. Reduce cache size: CACHE_MAX_SIZE_MB=100
  2. Reduce MongoDB pool size
  3. Lower GOMAXPROCS to limit goroutines
  4. Check for memory leaks (enable pprof profiling)

MongoDB Connection Failures

Check health:

Terminal window
curl http://localhost:4454/health | jq .checks.mongodb
{
"status": "unhealthy",
"latency_ms": 5000,
"error": "connection timeout"
}

Solutions:

  1. Verify MongoDB is running and accessible
  2. Check network connectivity (firewall, DNS)
  3. Review MongoDB authentication
  4. Check MongoDB replica set status (if applicable)

Performance Baseline

Recommended monitoring thresholds:

MetricWarningCritical
HTTP p95 latency500ms2000ms
MongoDB query p95100ms500ms
Error rate1%5%
Play execution time5 minutes1 hour
Memory usage70% of limit90% of limit
CPU usage70% of limit90% of limit
MongoDB connection pool75% full95% full