Skip to main content

Lead Capture

Category: Sales & CRM Β· πŸ“¦ 1 install

Get this pack β†’

This page is generated from the Air Pipe marketplace. Browse it live to install into your organization.

Capture leads from any form, landing page, or webhook. Every submission is validated, stored in Postgres, and fires an instant Slack notification. Your data stays on your infrastructure.


What's included​

FilePurpose
config.ymlAirPipe config with docs: true
schema.sqlLeads table with indexes

Endpoints​

MethodPathDescription
POST/leads/seedCreate table and load sample data
POST/leads/captureSubmit a lead β†’ store β†’ Slack alert
GET/leadsList all leads
POST/leads/getGet lead by ID
GET/leads/summaryCount by source

Setup​

1. Create the table​

Either run the schema directly:

psql $DATABASE_URL -f schema.sql

Or use the seed endpoint after deploying β€” it creates the table and loads sample data in one step:

curl -X POST https://your-airpipe-host/leads/seed

2. Set managed variables​

NameValue
DATABASE_URLyour Postgres connection string
SLACK_WEBHOOK_URLSlack incoming webhook URL

3. Point your form at the endpoint​

POST https://your-airpipe-host/leads/capture
Content-Type: application/json

Testing​

BASE=https://your-airpipe-host

# Seed the table with sample data (also creates the table β€” run this first)
curl -X POST $BASE/leads/seed

# Capture a lead
curl -X POST $BASE/leads/capture \
-H "Content-Type: application/json" \
-d '{
"email": "[email protected]",
"name": "Jane Smith",
"company": "Startup Inc",
"source": "landing_page",
"message": "Interested in the enterprise plan"
}'

# View all leads
curl $BASE/leads

# Count by source
curl $BASE/leads/summary

Connecting from a web form​

<form id="lead-form">
<input type="email" name="email" required />
<input type="text" name="name" />
<input type="text" name="company" />
<button type="submit">Get in touch</button>
</form>

<script>
document.getElementById('lead-form').addEventListener('submit', async (e) => {
e.preventDefault();
const data = Object.fromEntries(new FormData(e.target));
data.source = 'website_contact';
await fetch('https://your-airpipe-host/leads/capture', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data)
});
});
</script>

Customisation​

Add enrichment​

After StoreLead, add an HTTP action to call an enrichment provider (Clearbit, Apollo, etc.) and store the result in properties:

- name: EnrichLead
run_when_succeeded: [StoreLead]
http:
url: https://person.clearbit.com/v2/combined/find?email=a|ValidateBody::email|
headers:
authorization: "Bearer a|ap_var::CLEARBIT_API_KEY|"

Push to a CRM​

Add an HTTP action to create a contact in HubSpot, Salesforce, or Pipedrive after the lead is stored:

- name: CreateHubSpotContact
run_when_succeeded: [StoreLead]
http:
url: https://api.hubapi.com/crm/v3/objects/contacts
method: POST
headers:
content-type: application/json
authorization: "Bearer a|ap_var::HUBSPOT_API_KEY|"
body: |
{
"properties": {
"email": "a|ValidateBody::email|",
"firstname": "a|ValidateBody::name|",
"company": "a|ValidateBody::company|"
}
}

Notes​

  • email is the only required field. All others are optional.
  • The properties column exists in the schema for arbitrary JSON metadata (UTM params, form context, etc.). To populate it, add it to the params list in StoreLead and pass NULLIF($6::text, '')::jsonb in the query.
  • emit_metric increments app_leads_captured_total{source=...} on each capture, giving you a per-source counter in Prometheus.

Configuration​

config.yml​

name: LeadCapture
description: Capture leads from any form or webhook, store them in Postgres, and fire an instant Slack notification.

docs: true

# Required managed variables:
# DATABASE_URL β€” Postgres connection string
# SLACK_WEBHOOK_URL β€” Slack incoming webhook URL for notifications

global:
databases:
main:
driver: postgres
conn_string: "a|ap_var::DATABASE_URL|"

interfaces:

# POST /leads/capture
# Accept a lead from a contact form, landing page, or any webhook.
# Body: { "email": "...", "name": "...", "company": "...", "source": "...", "message": "..." }
leads/capture:
output: http
method: POST
summary: Capture lead
description: >
Validate and store an inbound lead, then send a real-time Slack notification.
All fields except email are optional.
tags: [leads]
request_example:
email: [email protected]
name: Jane Smith
company: Startup Inc
source: landing_page
message: "Interested in the enterprise plan"

actions:
- name: ValidateBody
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: email
is_not_null: true
is_not_empty: true
regex: "^[^@]+@[^@]+\\.[^@]+$"
description: "Lead email address"

- name: StoreLead
run_when_succeeded:
actions: [ValidateBody]
http_code_on_error: 400
database: main
query: |
INSERT INTO leads (email, name, company, source, message)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, email, name, company, source, created_at;
params:
- a|ValidateBody::email|
- a|ValidateBody::name|
- a|ValidateBody::company|
- a|ValidateBody::source|
- a|ValidateBody::message|
post_transforms:
- extract_value: "[0]"

- name: NotifySlack
run_when_succeeded: [StoreLead]
http:
url: a|ap_var::SLACK_WEBHOOK_URL|
method: POST
headers:
content-type: application/json
# User-supplied fields are ->json_escape'd so quotes/newlines in a lead
# don't break the JSON sent to Slack.
body: |
{
"text": "🎯 New lead: *a|ValidateBody::name->json_escape|* (a|ValidateBody::email->json_escape|)",
"attachments": [
{
"color": "#2196F3",
"fields": [
{ "title": "Company", "value": "a|ValidateBody::company->json_escape|", "short": true },
{ "title": "Source", "value": "a|ValidateBody::source->json_escape|", "short": true },
{ "title": "Message", "value": "a|ValidateBody::message->json_escape|", "short": false }
]
}
]
}

- name: TrackLead
run_when_succeeded: [StoreLead]
hide_data_on_success: true
emit_metric:
name: app_leads_captured_total
type: counter
labels:
source: a|ValidateBody::source|

# GET /leads
# All leads, most recent first.
leads:
output: http
summary: List leads
description: All captured leads, most recent first.
tags: [leads]
response_example:
- id: 1
email: [email protected]
name: Jane Smith
company: Startup Inc
source: landing_page
created_at: "2024-01-01T12:00:00Z"

actions:
- name: ListLeads
database: main
query: |
SELECT id, email, name, company, source, message, created_at
FROM leads
ORDER BY created_at DESC
LIMIT 500;

# POST /leads/get
# Fetch a single lead by ID.
# Body: { "id": 1 }
leads/get:
output: http
method: POST
summary: Get lead
description: Fetch a single lead by ID.
tags: [leads]
request_example:
id: 1

actions:
- name: ValidateBody
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: id
is_not_null: true

- name: GetLead
run_when_succeeded:
actions: [ValidateBody]
http_code_on_error: 400
database: main
query: |
SELECT id, email, name, company, source, message, properties, created_at
FROM leads
WHERE id = $1;
params:
- a|ValidateBody::id|
assert:
http_code_on_error: 404
error_message: "Lead not found"
tests:
- value: count()
is_equal_to: 1
post_transforms:
- extract_value: "[0]"

# POST /leads/seed
# Creates the leads table and indexes (idempotent) then loads sample leads
# from several sources. Safe to re-run β€” truncates before inserting.
leads/seed:
output: http
method: POST

actions:
- name: CreateLeadsTable
database: main
hide_data_on_success: true
query: |
CREATE TABLE IF NOT EXISTS leads (
id BIGSERIAL PRIMARY KEY,
email TEXT NOT NULL,
name TEXT,
company TEXT,
source TEXT,
message TEXT,
properties JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

- name: CreateIndexes
run_when_succeeded: [CreateLeadsTable]
database: main
hide_data_on_success: true
query: |
CREATE INDEX IF NOT EXISTS idx_leads_email ON leads (email);
CREATE INDEX IF NOT EXISTS idx_leads_source ON leads (source);
CREATE INDEX IF NOT EXISTS idx_leads_created_at ON leads (created_at DESC);

- name: TruncateLeads
run_when_succeeded: [CreateIndexes]
database: main
hide_data_on_success: true
query: |
TRUNCATE leads RESTART IDENTITY;

- name: InsertLeads
run_when_succeeded: [TruncateLeads]
database: main
hide_data_on_success: true
query: |
INSERT INTO leads (email, name, company, source, message) VALUES
('[email protected]', 'Alice Nguyen', 'Startup Inc', 'landing_page', 'Interested in the growth plan'),
('[email protected]', 'Bob Patel', 'Acme Corp', 'product_hunt', 'Saw you on PH β€” looks great'),
('[email protected]', 'Carol Smith', NULL, 'referral', 'My colleague recommended you'),
('[email protected]', 'Dave Okafor', 'BigCo Ltd', 'webinar', 'Attended your onboarding webinar'),
('[email protected]', 'Eve Torres', 'Creative Agency','landing_page', NULL),
('[email protected]', 'Frank MΓΌller', 'SaaS Tools GmbH','product_hunt', 'Looking for an API gateway'),
('[email protected]', 'Grace Lee', 'EcomStore', 'google_ads', 'Found you via search'),
('[email protected]', 'Hank Johansson', NULL, 'referral', NULL);

- name: SeedSummary
run_when_succeeded: [InsertLeads]
database: main
query: |
SELECT COUNT(*) AS leads_inserted FROM leads;
post_transforms:
- extract_value: "[0]"

# GET /leads/summary
# Count of leads grouped by source.
leads/summary:
output: http
summary: Leads by source
description: Count of captured leads grouped by source.
tags: [leads]
response_example:
- source: landing_page
count: 142
- source: product_hunt
count: 38

actions:
- name: SummaryBySource
database: main
query: |
SELECT source, COUNT(*) AS count
FROM leads
GROUP BY source
ORDER BY count DESC;

schema.sql​

CREATE TABLE IF NOT EXISTS leads (
id BIGSERIAL PRIMARY KEY,
email TEXT NOT NULL,
name TEXT,
company TEXT,
source TEXT,
message TEXT,
properties JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_leads_email ON leads (email);
CREATE INDEX IF NOT EXISTS idx_leads_source ON leads (source);
CREATE INDEX IF NOT EXISTS idx_leads_created_at ON leads (created_at DESC);