Skip to main content

Durable API Polling

Category: Operations

Get this pack →

This page is generated from the Air Pipe marketplace. Browse it live to install into your organization.

Poll an API on a schedule, process only items you haven't handled before, and retry only the items that fail — never the whole batch, and never silently dropping a failure. A reusable integration pattern built entirely from Air Pipe config: no external queue, no glue code.

It uses two engine capabilities:

  • state — durable key/value the workflow can read and write mid-run (get / set / advance / seen / delete), readable inline as a|state::namespace.key|.
  • lookup_partition — a lookup/loop returns { succeeded, failed } so a poll can act on per-item failures instead of swallowing them.

Requires an Air Pipe engine build that includes the state feature and lookup_partition.


What's included

FilePurpose
config.ymlThe poller, a self-contained demo, and state inspect/reset endpoints

Endpoints

MethodPathBodyDescription
GET/poll/demoSelf-contained per-item-retry demo (no external services)
GET/poll/runProduction poller: SOURCE_URL → SINK_URL (also runs on a schedule)
POST/state/inspect{id}Check whether an item id has been delivered
POST/state/reset-demoClear the demo's delivery markers (ids 1–3)

Try the demo (works out of the box)

BASE=https://your-airpipe-host

# Run 1: ids 1 and 3 deliver; id 2 fails (it carries ok:false). The `Result`
# action returns the `lookup_partition` split:
curl $BASE/poll/demo
# → "Result": {
# "succeeded": [
# { "data": { "CheckDone": {...}, "DeliverItem": { "data": {"id":1,"ok":true} }, "MarkDone": {...} } },
# { "data": { ...id 3... } }
# ],
# "failed": [ { "item": {"id":2,"ok":false}, "http_code": 422, "data": {...} } ]
# }
# Each succeeded entry is that item's per-step trace (the delivered row is under
# DeliverItem.data); each failed entry carries the original `item` plus its `http_code`.

# Run 2: ids 1 and 3 are already delivered, so their guard is skipped cleanly
# (no re-POST); only id 2 is retried and fails again.
curl $BASE/poll/demo
# → succeeded has 2 entries (1 and 3, not re-delivered), failed still has id 2

# Inspect a single id
curl -X POST $BASE/state/inspect -H 'content-type: application/json' -d '{"id": 1}'
# → { "value": true, "version": 1, "found": true }

# Reset and replay
curl -X POST $BASE/state/reset-demo

The key behaviour: a failed item is retried on the next run, and the items that already succeeded are not re-processed.


How it works

Inside the loop (lookup + lookup_partition: true), each item runs three steps:

  1. CheckDonestate.get the item's id. Already delivered? skip.
  2. DeliverItem — runs only if not already done; this is where your real delivery goes.
  3. MarkDonestate.set the id only after a successful delivery.

Because the id is marked done after success, a failure never gets marked — so it reappears and is retried next run, while successful items short-circuit at step 1. lookup_partition then reports { succeeded, failed } so you can see exactly what was delivered versus what will be retried.


Setup (production poller)

1. Set managed variables

In the Air Pipe dashboard:

NameValueNotes
SOURCE_URLhttps://api.example.com/ordersMust return a JSON array of items, each with a unique id
SINK_URLhttps://api.example.com/ingestReceives one item per call (POST)

2. Enable the schedule

In config.yml, set poll/runschedule.enabled: true (default cron is every 5 minutes). You can also trigger it manually with GET /poll/run while testing.

3. Deploy

Point Air Pipe at config.yml. The poller dedupes in the durable_polling state namespace; the demo uses durable_polling_demo, so they never collide.


Choosing your "new item" strategy

StrategyUse whenHow
Id dedupe (this pack)Items can succeed/fail independently or arrive out of orderstate.get/set per id, marked after success; or state.seen for a bounded recent-id set
High-water-mark cursorThe whole batch is processed atomically and the source is strictly time-ordered`state.advance { key: cursor, value: a

Why not a single cursor for per-item retry? If item @20 fails but item @30 succeeds, advancing the cursor to 30 would skip the failed @20 forever. A cursor means "everything before T is done", which only holds when the batch completes as a unit. For independent per-item retry, track items by id. Use advance only when you advance the cursor once, after the entire batch succeeds.

Cursor variant (snippet)

actions:
- name: Cursor
state: { get: { key: cursor, namespace: my_poll, default: "1970-01-01T00:00:00Z" } }
- name: Fetch
http: { url: "a|ap_var::SOURCE_URL|?updated_since=a|state::my_poll.cursor|" }
post_transforms: [ { extract_value: body } ]
# ... process the batch ...
- name: Advance # only reached if the whole batch succeeded
run_when_succeeded: [Process]
state: { advance: { key: cursor, namespace: my_poll, value: a|Process::max_updated_at| } }

Bounded dedupe set (snippet)

- name: Seen
state: { seen: { key: recent, id: a|body::id|, namespace: my_poll, ttl: "7d" } }
# a|Seen::was_new| is false for ids already in the (bounded, TTL'd) set

Durability & deployment notes

  • Self-hosted, single node uses an in-memory state backend by default — fine for testing, but it is lost on restart and not shared across instances. For production polling set AIRPIPE__STATE_CONN_STRING to a Postgres connection (durable + shared), or set AIRPIPE__REQUIRE_DURABLE_STATE=true to make the engine refuse ephemeral state writes so a misconfiguration fails fast instead of losing cursors.
  • Managed / hosted state is durable automatically.
  • Per-item delivery runs concurrently; state writes are atomic (optimistic concurrency on the durable backends), so concurrent updates are safe.

Configuration

config.yml

name: durable-api-polling
docs: true

# Durable API polling for Air Pipe.
#
# Poll a source on a schedule, process only items you have not handled before, and
# retry ONLY the items that fail — not the whole batch. Built on two engine features:
# - the `state` action : durable key/value (get / set / advance / seen / delete),
# read inline with a|state::namespace.key|
# - `lookup_partition` : a lookup/loop returns { succeeded, failed } so a poll can
# act on the failures instead of silently dropping them
#
# Correctness note: for PER-ITEM retry use id-based dedupe (mark an id done only AFTER
# it succeeds), not a single advancing cursor. If item@20 fails but item@30 succeeds, a
# cursor advanced to 30 would skip the failed item@20 forever. Dedupe tracks each item
# independently, so a failure is retried while its neighbours are not re-delivered. The
# `advance` cursor is still ideal for whole-batch / strictly-in-order sources (see README).
#
# Requires an engine build with the `state` feature and `lookup_partition`.

interfaces:

# --------------------------------------------------------------------------
# Self-contained demo (no external services). Run it twice: the first run
# delivers items 1 and 3 and reports item 2 as failed; the second run skips
# 1 and 3 (already done) and retries only item 2. Nothing is ever lost.
# --------------------------------------------------------------------------
poll/demo:
output: http
summary: Self-contained per-item-retry demo (no external source needed)
tags: [polling, demo]
actions:
- name: Items
hide_data_on_success: true
json_output: |
[
{"id": 1, "ok": true},
{"id": 2, "ok": false},
{"id": 3, "ok": true}
]

- name: Deliver
run_when_succeeded: [Items]
lookup: a|Items|
lookup_partition: true # → { succeeded: [...], failed: [...] }
actions:
# 1) Has this id already been delivered? (read-only check)
- name: CheckDone
state:
get:
key: a|body::id|
namespace: durable_polling_demo
default: false

# 2) Deliver only if not already done. (Here a failing assert stands in
# for a real delivery: items with ok:false return 422 and become failures.)
- name: DeliverItem
run_when_succeeded: [CheckDone] # gate must wait for CheckDone to complete
run_on_assertion:
tests:
- action: CheckDone
value: found
is_equal_to: false
input: a|body|
assert:
http_code_on_error: 422
error_message: "delivery failed for this item"
tests:
- value: ok
is_equal_to: true

# 3) Mark the id done ONLY after a successful delivery, so a failure is
# retried next run instead of being skipped.
- name: MarkDone
run_when_succeeded: [DeliverItem]
state:
set:
key: a|body::id|
namespace: durable_polling_demo
value: true

- name: Result
run_when_succeeded: [Deliver]
input: a|Deliver| # exposes { succeeded, failed } in the response

# --------------------------------------------------------------------------
# Production template: point SOURCE_URL / SINK_URL at your endpoints and flip
# the schedule on. Same dedupe + per-item-retry flow as the demo.
# --------------------------------------------------------------------------
poll/run:
output: http
summary: Poll SOURCE_URL and deliver only new items to SINK_URL, retrying failures per-item
tags: [polling, sync, scheduler]
schedule:
cron: "*/5 * * * *"
enabled: false # set true once SOURCE_URL / SINK_URL are configured
actions:
- name: Fetch
http:
url: a|ap_var::SOURCE_URL|
post_transforms:
- extract_value: body

- name: Deliver
run_when_succeeded: [Fetch]
lookup: a|Fetch|
lookup_partition: true
actions:
- name: CheckDone
state:
get:
key: a|body::id|
namespace: durable_polling
default: false
- name: DeliverItem
run_when_succeeded: [CheckDone] # gate must wait for CheckDone to complete
run_on_assertion:
tests:
- action: CheckDone
value: found
is_equal_to: false
http:
url: a|ap_var::SINK_URL|
method: POST
headers:
content-type: application/json
body: a|body|
assert:
http_code_on_error: 502
tests:
- value: status
is_equal_to: 200
- name: MarkDone
run_when_succeeded: [DeliverItem]
state:
set:
key: a|body::id|
namespace: durable_polling
value: true

- name: Summary
run_when_succeeded: [Deliver]
input: a|Deliver| # { succeeded: [...], failed: [...] }

# --------------------------------------------------------------------------
# Inspect whether a given id has been delivered. Demonstrates state.get and
# the a|state::namespace.key| inline read.
# --------------------------------------------------------------------------
state/inspect:
output: http
method: POST
summary: Check delivery state for an item id
tags: [state]
request_example:
id: 1
actions:
- name: Status
state:
get:
key: a|body::id|
namespace: durable_polling_demo
default: false

# --------------------------------------------------------------------------
# Reset the demo's delivery markers so poll/demo can be replayed from scratch.
# --------------------------------------------------------------------------
state/reset-demo:
output: http
method: POST
summary: Clear the poll/demo delivery markers (ids 1-3)
tags: [state]
actions:
- name: Del1
state: { delete: { key: "1", namespace: durable_polling_demo } }
- name: Del2
run_when_succeeded: [Del1]
state: { delete: { key: "2", namespace: durable_polling_demo } }
- name: Del3
run_when_succeeded: [Del2]
state: { delete: { key: "3", namespace: durable_polling_demo } }