Skip to main content

Flow Control

Category: Engineering & DevOps

Get this pack →

This page is generated from the Air Pipe marketplace. Browse it live to install into your organization.

Practical flow-control recipes for Air Pipe pipelines — pace, retry, branch, recover, and require a quorum — built entirely from declarative config. Every endpoint is self-contained (no external services, databases, or managed variables), so you can deploy it and curl it immediately.

It showcases:

  • delay — pause a run for a bounded duration to throttle/space out work ("300ms", "2s", or a millisecond number; returns { delayed_ms }, capped by AIRPIPE__MAX_DELAY_SECS, default 300s).
  • retry — re-run a failing action automatically with exponential_backoff.
  • run_on_assertion — a guard that runs an action only if a test passes, otherwise skips it cleanly (a skip is not a failure).
  • run_when_failed / run_when_finished — error-recovery and always-run "finally" branches.
  • run_when_succeeded with at_least — proceed once a quorum of sources succeed.

Requires an Air Pipe engine build that includes the delay action.


Endpoints

MethodPathBodyDemonstrates
GET/flow/throttledelay between sequential steps (both string + ms forms)
GET/flow/retryretry with exponential backoff, then a run_when_failed fallback
POST/flow/branch{ tier }if/else routing with run_on_assertion (skipped guard ≠ failure)
GET/flow/resilienttry / catch / finally: run_when_failed + delay cooldown + run_when_finished
GET/flow/quorumproceed when at_least N of M succeed

Try it

BASE=https://your-airpipe-host

# 1. Throttle — two 300ms pauses between steps (~600ms wall time).
curl $BASE/flow/throttle
# → { ... "Summary": { "throttled_steps": 3, "first_pause_ms": 300, "second_pause_ms": 300 } }

# 2. Retry — the upstream "fails" every time, so retries are exhausted and the
# cached fallback is served. The request still returns 200.
curl $BASE/flow/retry
# → "Fallback": { "source": "fallback", "note": "upstream unavailable after retries..." }

# 3. Branch — premium and standard take different paths; the other path is skipped.
curl -X POST $BASE/flow/branch -H 'content-type: application/json' -d '{"tier":"premium"}'
# → "PremiumPath": { "path": "premium", ... } (StandardPath skipped)
curl -X POST $BASE/flow/branch -H 'content-type: application/json' -d '{"tier":"standard"}'
# → "StandardPath": { "path": "standard", ... } (PremiumPath skipped)
curl -X POST $BASE/flow/branch -H 'content-type: application/json' -d '{}'
# → 400: tier is required (validation failures are NOT masked)

# 4. Resilient — the primary fails, recovery runs, a 250ms cooldown applies, the
# finalizer always runs. Returns 200 because recovery + finalize succeed.
curl $BASE/flow/resilient

# 5. Quorum — three replicas, one fails its check, but at_least:2 is met.
curl $BASE/flow/quorum
# → "Aggregate": { "quorum": "2 of 3", "proceeded": true }

How each pattern works

Throttle with delay

A delay action pauses the run, then the next step proceeds. Because actions otherwise run concurrently, each delay (and the step after it) uses run_when_succeeded to force the sequence. Use this to keep under a downstream rate limit:

- name: Pace1
run_when_succeeded: [Step1]
delay: "300ms" # or a number of milliseconds: delay: 300

delay is in-process — good for seconds-to-minutes pacing, capped by AIRPIPE__MAX_DELAY_SECS. It is not a durable "resume next week" timer; for long waits, schedule the work instead.

Retry with backoff vs. proactive delay

retry reacts to failure (re-runs the same action); delay is proactive pacing. With exponential_backoff: true the wait doubles each attempt:

retry: { attempts: 3, delay: 200, exponential_backoff: true }   # waits 200ms, then 400ms

When retries are exhausted the action fails, so a run_when_failed fallback keeps the endpoint answering.

Branch with run_on_assertion

run_on_assertion runs an action only if its tests pass; otherwise it is skipped — a no-op, not a failure, so the run still succeeds. Two guards with is_equal_to / is_not_equal_to on the same field form an if/else. A skip is treated as success even when the skipped guard is the last action, so no terminal shim is needed — yet a genuine validation failure still surfaces its code:

  • {"tier":"premium"}PremiumPath runs, StandardPath is skipped → 200
  • {"tier":"standard"} → the reverse → 200
  • {} (no tier) → Classify fails its assertion → 400 (not masked by the skipped guards)

run_on_assertion skips when its test is unmet (clean), whereas run_when_succeeded fails its dependent if the dependency failed (a hard gate). A skipped guard reports why it skipped in its errors field for visibility, but that no longer affects the HTTP status.

Try / catch / finally

  • run_when_failed: [Risky] is the catch — runs only if Risky failed.
  • a delay gives a cool-down between recovery and finalizing.
  • run_when_finished: [Risky] is the finally — runs whether Risky succeeded or failed.

Quorum with at_least

For redundant sources, proceed as soon as enough respond:

run_when_succeeded:
at_least: 2
actions: [ReplicaA, ReplicaB, ReplicaC]

A failing source is tolerated as long as the quorum is met.


Notes

  • Self-hosted only knob: delay's cap is AIRPIPE__MAX_DELAY_SECS (default 300s); over-cap delays are rejected with a clear error rather than silently clamped.
  • Nothing here persists state or calls out to the network, so the pack is safe to run anywhere and useful as a copy-paste reference for the run-condition family.

Configuration

config.yml

name: FlowControl
description: >
Flow-control recipes for Air Pipe pipelines: pace and throttle steps with the
`delay` action, retry with exponential backoff, branch on a value with
`run_on_assertion`, and run success / failure / always-finally fallbacks with
the run-condition family. Every interface is self-contained — no external
services or managed variables required.

docs: true

interfaces:

# ── 1. Throttle: pace sequential steps with the `delay` action ──────────────
#
# `delay` pauses the run for a bounded duration, then continues. Use it to space
# out calls to a rate-limited API instead of hammering it. The value is either a
# humantime string ("300ms", "2s", "1m") or a whole number of milliseconds, and
# the action returns { delayed_ms }. It is capped by AIRPIPE__MAX_DELAY_SECS
# (default 300s) — over-cap requests are rejected, not silently clamped. A delay
# needs `run_when_succeeded` on the prior step (actions otherwise run concurrently).
flow/throttle:
output: http
summary: Throttle sequential steps with delay
description: >
Pace a sequence of steps with the `delay` action so you don't exceed a
downstream rate limit. Shows both delay forms — a humantime string and a
millisecond number — each returning { delayed_ms }.
tags: [flow-control, delay]
response_example:
throttled_steps: 3
first_pause_ms: 300
second_pause_ms: 300

actions:
- name: Step1
hide_data_on_success: true
json_output: |
{ "item": 1 }

# humantime string form
- name: Pace1
run_when_succeeded: [Step1]
delay: "300ms"

- name: Step2
run_when_succeeded: [Pace1]
hide_data_on_success: true
json_output: |
{ "item": 2 }

# milliseconds number form
- name: Pace2
run_when_succeeded: [Step2]
delay: 300

- name: Step3
run_when_succeeded: [Pace2]
hide_data_on_success: true
json_output: |
{ "item": 3 }

- name: Summary
run_when_succeeded: [Step3]
json_output: |
{ "throttled_steps": 3 }
post_transforms:
- add_attribute:
first_pause_ms: a|Pace1::delayed_ms|
second_pause_ms: a|Pace2::delayed_ms|

# ── 2. Retry with exponential backoff, then fall back ───────────────────────
#
# `retry` re-runs a failing action automatically: { attempts, delay (ms between
# tries), exponential_backoff } — with backoff the wait doubles each try
# (delay, delay*2, delay*4, ...). This is the engine's built-in retry; contrast
# it with the standalone `delay` action above, which is for proactive pacing.
# Here the call always fails its assertion, so it exhausts its retries and
# `run_when_failed` serves a cached fallback so the endpoint still answers 200.
flow/retry:
output: http
summary: Retry with exponential backoff, then fall back
description: >
Automatically retry a failing call with exponential backoff, and serve a
cached fallback via `run_when_failed` when the retries are exhausted. (The
individual retry attempts are visible in the engine logs.)
tags: [flow-control, retry, resilience]
response_example:
source: fallback
note: upstream unavailable after retries, served cached value

actions:
# Deterministically "unavailable": the assertion never passes, so this
# action fails after `attempts` tries spaced by backoff (200ms, then 400ms).
- name: CallUpstream
json_output: |
{ "status": 503, "body": null }
retry:
attempts: 3
delay: 200
exponential_backoff: true
assert:
http_code_on_error: 502
error_message: "upstream did not return 200 after retries"
tests:
- value: status
is_equal_to: 200

# Runs only because CallUpstream failed — keeps the request successful.
- name: Fallback
run_when_failed: [CallUpstream]
json_output: |
{ "source": "fallback", "note": "upstream unavailable after retries, served cached value" }

# ── 3. Branch on a value with `run_on_assertion` (if / else) ────────────────
#
# `run_on_assertion` is a GUARD: the action runs only if its tests pass,
# otherwise it is skipped cleanly. A skip is NOT a failure — the run still
# succeeds, even when the skipped guard is the LAST action of the interface
# (the engine treats a skipped action as success at the response boundary).
# Two mutually-exclusive guards (is_equal_to / is_not_equal_to on the same
# field) form an if/else, with no terminal shim action needed. A genuine
# validation failure (Classify) still surfaces its 400 — only skips are free.
flow/branch:
output: http
method: POST
summary: Branch on a value with run_on_assertion
description: >
Route a request down one of two mutually-exclusive paths based on a field,
using `run_on_assertion` guards. The guard that doesn't match is skipped
(cleanly, not failed). POST { "tier": "premium" } or { "tier": "standard" }.
tags: [flow-control, branching, assertions]
request_example:
account_id: 42
tier: premium

actions:
- name: Classify
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: tier
is_not_null: true
description: "tier is required (premium | standard)"

# IF tier == premium
- name: PremiumPath
run_when_succeeded: [Classify]
run_on_assertion:
tests:
- action: Classify
value: tier
is_equal_to: premium
json_output: |
{ "path": "premium", "perks": ["priority_support", "higher_limits"] }

# ELSE (tier != premium)
- name: StandardPath
run_when_succeeded: [Classify]
run_on_assertion:
tests:
- action: Classify
value: tier
is_not_equal_to: premium
json_output: |
{ "path": "standard", "perks": ["community_support"] }

# ── 4. Try / catch / finally: success, failure, and always branches ─────────
#
# The run-condition family expresses error handling:
# run_when_failed — the "catch": runs only if the named action(s) failed.
# delay — a cooldown / back-off between recovery and finalizing.
# run_when_finished — the "finally": runs once the action completes, whether
# it succeeded or failed (it is an alias of depends_on).
# (For a clean success-vs-failure split where the untaken branch is *skipped*
# rather than marked failed, use run_on_assertion as in /flow/branch — a
# run_when_succeeded branch on a failed action is itself reported as failed.)
flow/resilient:
output: http
summary: Try / catch / finally with run conditions + delay
description: >
A primary action fails; `run_when_failed` recovers, a `delay` applies a
short cool-down, and `run_when_finished` runs a finalizer regardless of
outcome. The endpoint still returns 200 because recovery + finalize succeed.
tags: [flow-control, conditions, resilience, delay]

actions:
# Deterministically fails its assertion (ok is false).
- name: Risky
json_output: |
{ "ok": false }
assert:
http_code_on_error: 503
error_message: "primary operation failed"
tests:
- value: ok
is_equal_to: true

# catch: only runs because Risky failed.
- name: Recover
run_when_failed: [Risky]
json_output: |
{ "branch": "recovered", "used_cache": true }

# cool-down between recovery and finalizing (back-off via the delay action).
- name: Cooldown
run_when_succeeded: [Recover]
delay: "250ms"

# finally: runs whether Risky succeeded or failed.
- name: Finalize
run_when_finished: [Risky]
json_output: |
{ "finalized": true }

# ── 5. Quorum: proceed when at_least N of M succeed ─────────────────────────
#
# `run_when_succeeded` with `at_least` runs once a QUORUM of the listed actions
# succeed — for redundant sources where you don't need every one. Three replicas
# are queried; one fails its check, but at_least: 2 is satisfied so the
# aggregate proceeds (the failing replica is tolerated).
flow/quorum:
output: http
summary: Proceed when a quorum (at_least N of M) succeed
description: >
Query redundant sources and continue as soon as a quorum responds, using
`run_when_succeeded: { at_least: N, actions: [...] }`. One source fails its
assertion here, but two succeed, so the quorum is met.
tags: [flow-control, conditions, resilience]
response_example:
quorum: 2 of 3
proceeded: true

actions:
- name: ReplicaA
json_output: |
{ "region": "us", "value": 10 }

- name: ReplicaB
json_output: |
{ "region": "eu", "value": 11 }

# This replica returns no value and fails its check.
- name: ReplicaC
json_output: |
{ "region": "ap", "value": null }
assert:
http_code_on_error: 504
error_message: "replica returned no value"
tests:
- value: value
is_not_null: true

- name: Aggregate
run_when_succeeded:
at_least: 2
actions: [ReplicaA, ReplicaB, ReplicaC]
json_output: |
{ "quorum": "2 of 3", "proceeded": true }