Flows
Status: Not yet implemented. This feature is designed but not built. Do not attempt to use it. Current implementation status: none. See docs/FUTURE_IDEAS.md for the roadmap.
Flows provide explicit control over execution order. When a flow: block is defined, it replaces the default agent loop with a structured pipeline.
When to Use Flows
- Without flow: The agent loop runs autonomously — the LLM decides what to do
- With flow: You define the exact sequence of steps — deterministic orchestration
Use flows when you need predictable pipelines (CI/CD, data processing, multi-stage analysis). Use the agent loop for open-ended tasks (coding, research, conversation).
Basic Flow
flow:
- id: read_config
action: filesystem.read_file
params:
path: "{{workspace}}/config.json"
- id: analyze
agent: default
input: |
Analyze this configuration:
{{result.read_config}}
- id: save_report
action: filesystem.write_file
params:
path: "{{workspace}}/report.md"
content: "{{result.analyze}}"
Flow Step Types
The LLMOS flow engine supports 18 step types.
Action Step
Execute a module action directly:
- id: get_files
action: filesystem.list_directory
params:
path: "{{workspace}}/src"
timeout: "10s"
on_error: skip # fail | skip | continue | rollback
retry:
max_attempts: 3
backoff: exponential
perception: # Optional: per-step perception override
capture_before: true
capture_after: true
ocr_enabled: false
Agent Step
Delegate to an LLM agent for reasoning:
- id: think
agent: planner # Agent ID (or "default" for single agent)
input: |
Based on the file listing:
{{result.get_files}}
Decide which files need review.
Sequence
Execute steps in order:
- id: setup
sequence:
- action: os_exec.run_command
params: { command: "mkdir -p output" }
- action: os_exec.run_command
params: { command: "git status" }
Parallel
Execute steps concurrently:
- id: checks
parallel:
max_concurrent: 3 # Max parallel tasks (default: 10)
fail_fast: false # Stop all on first failure
steps:
- id: lint
action: os_exec.run_command
params: { command: "ruff check ." }
- id: typecheck
action: os_exec.run_command
params: { command: "mypy ." }
- id: security
action: os_exec.run_command
params: { command: "bandit -r ." }
Branch
Conditional execution based on an expression:
- id: route
branch:
"on": "{{result.check.exit_code}}"
cases:
"0":
- id: success
agent: default
input: "Tests passed! Summarize the results."
"1":
- id: fix
agent: default
input: "Tests failed. Analyze and fix: {{result.check.stderr}}"
default:
- id: unknown
agent: default
input: "Unexpected exit code: {{result.check.exit_code}}"
Loop
Repeat steps until a condition is met:
- id: retry_loop
loop:
max_iterations: 5
until: "{{result.test.exit_code == 0}}"
body:
- id: fix_attempt
agent: default
input: "Fix the failing test. Attempt {{loop.iteration}}/5"
- id: test
action: os_exec.run_command
params: { command: "pytest" }
The loop context provides:
{{loop.iteration}}— Current iteration (0-based){{loop.item}}— Current item (in map loops)
Map
Apply steps to each item in a collection:
- id: analyze_files
map:
over: "{{result.list_files}}" # Expression yielding a list
as: item # Variable name (default: "item")
max_concurrent: 5 # Parallel execution
step:
- id: analyze_one
agent: default
input: "Analyze file: {{loop.item}}"
Reduce
Aggregate results from a collection:
- id: combine
reduce:
over: "{{result.analyze_files}}"
initial: { summary: "", count: 0 }
as: acc
step:
agent: default
input: |
Current summary: {{loop.acc.summary}}
New finding: {{loop.item}}
Merge into updated summary.
Race
Run steps in parallel, first to finish wins:
- id: fastest
race:
steps:
- id: search_web
action: web_search.search
params: { query: "{{topic}}" }
- id: search_local
action: filesystem.search_files
params: { pattern: "{{topic}}" }
Pipe
Chain steps where each step's output becomes the next step's input:
- id: pipeline
pipe:
- action: filesystem.read_file
params: { path: "{{workspace}}/data.json" }
- agent: default
input: "Parse and clean this data: {{result.previous}}"
- action: filesystem.write_file
params: { path: "{{workspace}}/clean.json", content: "{{result.previous}}" }
Spawn
Spawn a sub-application:
- id: sub_analysis
spawn:
app: "./analysis.app.yaml" # Path to sub-app
input: "Analyze {{workspace}}/src"
timeout: "300s"
await: true # Wait for result (default: true)
Approval
Human approval gate:
- id: deploy_approval
approval:
message: "Deploy to production?"
options:
- label: "Yes, deploy"
value: approve
- label: "No, cancel"
value: reject
- label: "Deploy with changes"
value: modify
schema:
properties:
changes: { type: string }
timeout: "300s"
on_timeout: reject
channel: cli # cli | http | slack | email
"on":
approve:
goto: deploy
reject:
goto: cancel
Try/Catch
Error handling:
- try:
- action: os_exec.run_command
params: { command: "risky-operation" }
catch:
- error: "*" # Catch all errors
do:
agent: default
input: "Handle error: {{error}}"
then: continue # fail | continue
finally:
- action: os_exec.run_command
params: { command: "cleanup" }
Dispatch
Dynamic module/action at runtime:
- id: dynamic
dispatch:
module: "{{result.plan.module}}"
action: "{{result.plan.action}}"
params: "{{result.plan.params}}"
Emit
Publish an event to the event bus:
- id: notify
emit:
topic: "app.review.complete"
event:
status: "done"
findings: "{{result.review.count}}"
Wait
Wait for an event from the bus:
- id: await_approval
wait:
topic: "app.approval.response"
filter: "{{event.request_id == run.id}}"
timeout: "3600s"
End
Terminate the flow early:
- id: abort
end:
status: failure # success | failure | cancelled
output:
error: "No files to process"
Use Macro
Invoke a reusable macro (see Macros):
- id: check_code
use: run_linter
with:
tool: "ruff check"
args: "--output-format=text ."
Goto
Jump to a labeled step:
- id: retry
goto: start # Jump to step with id "start"
Referencing Step Results
Every step with an id stores its result. Access it with {{result.step_id}}:
flow:
- id: read_file
action: filesystem.read_file
params: { path: "config.json" }
- id: process
agent: default
input: "Process: {{result.read_file}}"
For nested results, use dot notation: {{result.step_id.field.subfield}}.
Flow with Checkpoint
Enable checkpoint to resume long-running flows after crashes or restarts. When checkpoint: true, the flow executor persists its state (completed steps + their results) to the KV store after each step. On restart, it loads the checkpoint and skips already-completed steps.
app:
name: etl-pipeline
checkpoint: true # Enable checkpoint/resume
flow:
- id: step1
action: filesystem.read_file
params: { path: "data.txt" }
# If the process crashes here, re-running resumes from step2
# (step1's result is restored from the checkpoint)
- id: step2
agent: default
input: "Process: {{result.step1}}"
How It Works
- After each step — The executor saves a checkpoint (completed step IDs, their results, and the next step index) to the KV store under the key
digitorn:flow:checkpoint:{flow_id} - On restart with
resume=true— The executor loads the checkpoint, restores all completed step results into the expression context, and resumes from the next incomplete step - On success — The checkpoint is cleared
- On failure — The checkpoint is not cleared, allowing you to fix the issue and retry from where it left off
Requirements
- KV store must be available — Checkpointing requires a persistent KV store (SQLite-backed in daemon mode). Without a KV store, checkpoint is silently disabled.
- Steps must have
id— Only steps with anidfield have their results stored and restored. Anonymous steps are re-executed on resume.
When to Use
- Long-running ETL pipelines
- Multi-stage deployments with approval gates
- Expensive LLM analysis flows where you don't want to re-process completed steps