Durable API Polling
Category: Operations
This page is generated from the Air Pipe marketplace. Browse it live to install into your organization.
Poll an API on a schedule, process only items you haven't handled before, and retry only the items that fail — never the whole batch, and never silently dropping a failure. A reusable integration pattern built entirely from Air Pipe config: no external queue, no glue code.
It uses two engine capabilities:
state— durable key/value the workflow can read and write mid-run (get/set/advance/seen/delete), readable inline asa|state::namespace.key|.lookup_partition— a lookup/loop returns{ succeeded, failed }so a poll can act on per-item failures instead of swallowing them.
Requires an Air Pipe engine build that includes the
statefeature andlookup_partition.
What's included
| File | Purpose |
|---|---|
config.yml | The poller, a self-contained demo, and state inspect/reset endpoints |
Endpoints
| Method | Path | Body | Description |
|---|---|---|---|
GET | /poll/demo | — | Self-contained per-item-retry demo (no external services) |
GET | /poll/run | — | Production poller: SOURCE_URL → SINK_URL (also runs on a schedule) |
POST | /state/inspect | {id} | Check whether an item id has been delivered |
POST | /state/reset-demo | — | Clear the demo's delivery markers (ids 1–3) |
Try the demo (works out of the box)
BASE=https://your-airpipe-host
# Run 1: ids 1 and 3 deliver; id 2 fails (it carries ok:false). The `Result`
# action returns the `lookup_partition` split:
curl $BASE/poll/demo
# → "Result": {
# "succeeded": [
# { "data": { "CheckDone": {...}, "DeliverItem": { "data": {"id":1,"ok":true} }, "MarkDone": {...} } },
# { "data": { ...id 3... } }
# ],
# "failed": [ { "item": {"id":2,"ok":false}, "http_code": 422, "data": {...} } ]
# }
# Each succeeded entry is that item's per-step trace (the delivered row is under
# DeliverItem.data); each failed entry carries the original `item` plus its `http_code`.
# Run 2: ids 1 and 3 are already delivered, so their guard is skipped cleanly
# (no re-POST); only id 2 is retried and fails again.
curl $BASE/poll/demo
# → succeeded has 2 entries (1 and 3, not re-delivered), failed still has id 2
# Inspect a single id
curl -X POST $BASE/state/inspect -H 'content-type: application/json' -d '{"id": 1}'
# → { "value": true, "version": 1, "found": true }
# Reset and replay
curl -X POST $BASE/state/reset-demo
The key behaviour: a failed item is retried on the next run, and the items that already succeeded are not re-processed.
How it works
Inside the loop (lookup + lookup_partition: true), each item runs three steps:
- CheckDone —
state.getthe item's id. Already delivered? skip. - DeliverItem — runs only if not already done; this is where your real delivery goes.
- MarkDone —
state.setthe id only after a successful delivery.
Because the id is marked done after success, a failure never gets marked — so it reappears and is retried next run, while successful items short-circuit at step 1. lookup_partition then reports { succeeded, failed } so you can see exactly what was delivered versus what will be retried.
Setup (production poller)
1. Set managed variables
In the Air Pipe dashboard:
| Name | Value | Notes |
|---|---|---|
SOURCE_URL | https://api.example.com/orders | Must return a JSON array of items, each with a unique id |
SINK_URL | https://api.example.com/ingest | Receives one item per call (POST) |
2. Enable the schedule
In config.yml, set poll/run → schedule.enabled: true (default cron is every 5 minutes). You can also trigger it manually with GET /poll/run while testing.
3. Deploy
Point Air Pipe at config.yml. The poller dedupes in the durable_polling state namespace; the demo uses durable_polling_demo, so they never collide.
Choosing your "new item" strategy
| Strategy | Use when | How |
|---|---|---|
| Id dedupe (this pack) | Items can succeed/fail independently or arrive out of order | state.get/set per id, marked after success; or state.seen for a bounded recent-id set |
| High-water-mark cursor | The whole batch is processed atomically and the source is strictly time-ordered | `state.advance { key: cursor, value: a |
Why not a single cursor for per-item retry? If item @20 fails but item @30 succeeds, advancing the cursor to 30 would skip the failed @20 forever. A cursor means "everything before T is done", which only holds when the batch completes as a unit. For independent per-item retry, track items by id. Use advance only when you advance the cursor once, after the entire batch succeeds.
Cursor variant (snippet)
actions:
- name: Cursor
state: { get: { key: cursor, namespace: my_poll, default: "1970-01-01T00:00:00Z" } }
- name: Fetch
http: { url: "a|ap_var::SOURCE_URL|?updated_since=a|state::my_poll.cursor|" }
post_transforms: [ { extract_value: body } ]
# ... process the batch ...
- name: Advance # only reached if the whole batch succeeded
run_when_succeeded: [Process]
state: { advance: { key: cursor, namespace: my_poll, value: a|Process::max_updated_at| } }
Bounded dedupe set (snippet)
- name: Seen
state: { seen: { key: recent, id: a|body::id|, namespace: my_poll, ttl: "7d" } }
# a|Seen::was_new| is false for ids already in the (bounded, TTL'd) set
Durability & deployment notes
- Self-hosted, single node uses an in-memory state backend by default — fine for testing, but it is lost on restart and not shared across instances. For production polling set
AIRPIPE__STATE_CONN_STRINGto a Postgres connection (durable + shared), or setAIRPIPE__REQUIRE_DURABLE_STATE=trueto make the engine refuse ephemeral state writes so a misconfiguration fails fast instead of losing cursors. - Managed / hosted state is durable automatically.
- Per-item delivery runs concurrently;
statewrites are atomic (optimistic concurrency on the durable backends), so concurrent updates are safe.
Configuration
config.yml
name: durable-api-polling
docs: true
# Durable API polling for Air Pipe.
#
# Poll a source on a schedule, process only items you have not handled before, and
# retry ONLY the items that fail — not the whole batch. Built on two engine features:
# - the `state` action : durable key/value (get / set / advance / seen / delete),
# read inline with a|state::namespace.key|
# - `lookup_partition` : a lookup/loop returns { succeeded, failed } so a poll can
# act on the failures instead of silently dropping them
#
# Correctness note: for PER-ITEM retry use id-based dedupe (mark an id done only AFTER
# it succeeds), not a single advancing cursor. If item@20 fails but item@30 succeeds, a
# cursor advanced to 30 would skip the failed item@20 forever. Dedupe tracks each item
# independently, so a failure is retried while its neighbours are not re-delivered. The
# `advance` cursor is still ideal for whole-batch / strictly-in-order sources (see README).
#
# Requires an engine build with the `state` feature and `lookup_partition`.
interfaces:
# --------------------------------------------------------------------------
# Self-contained demo (no external services). Run it twice: the first run
# delivers items 1 and 3 and reports item 2 as failed; the second run skips
# 1 and 3 (already done) and retries only item 2. Nothing is ever lost.
# --------------------------------------------------------------------------
poll/demo:
output: http
summary: Self-contained per-item-retry demo (no external source needed)
tags: [polling, demo]
actions:
- name: Items
hide_data_on_success: true
json_output: |
[
{"id": 1, "ok": true},
{"id": 2, "ok": false},
{"id": 3, "ok": true}
]
- name: Deliver
run_when_succeeded: [Items]
lookup: a|Items|
lookup_partition: true # → { succeeded: [...], failed: [...] }
actions:
# 1) Has this id already been delivered? (read-only check)
- name: CheckDone
state:
get:
key: a|body::id|
namespace: durable_polling_demo
default: false
# 2) Deliver only if not already done. (Here a failing assert stands in
# for a real delivery: items with ok:false return 422 and become failures.)
- name: DeliverItem
run_when_succeeded: [CheckDone] # gate must wait for CheckDone to complete
run_on_assertion:
tests:
- action: CheckDone
value: found
is_equal_to: false
input: a|body|
assert:
http_code_on_error: 422
error_message: "delivery failed for this item"
tests:
- value: ok
is_equal_to: true
# 3) Mark the id done ONLY after a successful delivery, so a failure is
# retried next run instead of being skipped.
- name: MarkDone
run_when_succeeded: [DeliverItem]
state:
set:
key: a|body::id|
namespace: durable_polling_demo
value: true
- name: Result
run_when_succeeded: [Deliver]
input: a|Deliver| # exposes { succeeded, failed } in the response
# --------------------------------------------------------------------------
# Production template: point SOURCE_URL / SINK_URL at your endpoints and flip
# the schedule on. Same dedupe + per-item-retry flow as the demo.
# --------------------------------------------------------------------------
poll/run:
output: http
summary: Poll SOURCE_URL and deliver only new items to SINK_URL, retrying failures per-item
tags: [polling, sync, scheduler]
schedule:
cron: "*/5 * * * *"
enabled: false # set true once SOURCE_URL / SINK_URL are configured
actions:
- name: Fetch
http:
url: a|ap_var::SOURCE_URL|
post_transforms:
- extract_value: body
- name: Deliver
run_when_succeeded: [Fetch]
lookup: a|Fetch|
lookup_partition: true
actions:
- name: CheckDone
state:
get:
key: a|body::id|
namespace: durable_polling
default: false
- name: DeliverItem
run_when_succeeded: [CheckDone] # gate must wait for CheckDone to complete
run_on_assertion:
tests:
- action: CheckDone
value: found
is_equal_to: false
http:
url: a|ap_var::SINK_URL|
method: POST
headers:
content-type: application/json
body: a|body|
assert:
http_code_on_error: 502
tests:
- value: status
is_equal_to: 200
- name: MarkDone
run_when_succeeded: [DeliverItem]
state:
set:
key: a|body::id|
namespace: durable_polling
value: true
- name: Summary
run_when_succeeded: [Deliver]
input: a|Deliver| # { succeeded: [...], failed: [...] }
# --------------------------------------------------------------------------
# Inspect whether a given id has been delivered. Demonstrates state.get and
# the a|state::namespace.key| inline read.
# --------------------------------------------------------------------------
state/inspect:
output: http
method: POST
summary: Check delivery state for an item id
tags: [state]
request_example:
id: 1
actions:
- name: Status
state:
get:
key: a|body::id|
namespace: durable_polling_demo
default: false
# --------------------------------------------------------------------------
# Reset the demo's delivery markers so poll/demo can be replayed from scratch.
# --------------------------------------------------------------------------
state/reset-demo:
output: http
method: POST
summary: Clear the poll/demo delivery markers (ids 1-3)
tags: [state]
actions:
- name: Del1
state: { delete: { key: "1", namespace: durable_polling_demo } }
- name: Del2
run_when_succeeded: [Del1]
state: { delete: { key: "2", namespace: durable_polling_demo } }
- name: Del3
run_when_succeeded: [Del2]
state: { delete: { key: "3", namespace: durable_polling_demo } }