Lead Capture
Category: Sales & CRM Β· π¦ 1 install
This page is generated from the Air Pipe marketplace. Browse it live to install into your organization.
Capture leads from any form, landing page, or webhook. Every submission is validated, stored in Postgres, and fires an instant Slack notification. Your data stays on your infrastructure.
What's includedβ
| File | Purpose |
|---|---|
config.yml | AirPipe config with docs: true |
schema.sql | Leads table with indexes |
Endpointsβ
| Method | Path | Description |
|---|---|---|
POST | /leads/seed | Create table and load sample data |
POST | /leads/capture | Submit a lead β store β Slack alert |
GET | /leads | List all leads |
POST | /leads/get | Get lead by ID |
GET | /leads/summary | Count by source |
Setupβ
1. Create the tableβ
Either run the schema directly:
psql $DATABASE_URL -f schema.sql
Or use the seed endpoint after deploying β it creates the table and loads sample data in one step:
curl -X POST https://your-airpipe-host/leads/seed
2. Set managed variablesβ
| Name | Value |
|---|---|
DATABASE_URL | your Postgres connection string |
SLACK_WEBHOOK_URL | Slack incoming webhook URL |
3. Point your form at the endpointβ
POST https://your-airpipe-host/leads/capture
Content-Type: application/json
Testingβ
BASE=https://your-airpipe-host
# Seed the table with sample data (also creates the table β run this first)
curl -X POST $BASE/leads/seed
# Capture a lead
curl -X POST $BASE/leads/capture \
-H "Content-Type: application/json" \
-d '{
"email": "[email protected]",
"name": "Jane Smith",
"company": "Startup Inc",
"source": "landing_page",
"message": "Interested in the enterprise plan"
}'
# View all leads
curl $BASE/leads
# Count by source
curl $BASE/leads/summary
Connecting from a web formβ
<form id="lead-form">
<input type="email" name="email" required />
<input type="text" name="name" />
<input type="text" name="company" />
<button type="submit">Get in touch</button>
</form>
<script>
document.getElementById('lead-form').addEventListener('submit', async (e) => {
e.preventDefault();
const data = Object.fromEntries(new FormData(e.target));
data.source = 'website_contact';
await fetch('https://your-airpipe-host/leads/capture', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data)
});
});
</script>
Customisationβ
Add enrichmentβ
After StoreLead, add an HTTP action to call an enrichment provider (Clearbit, Apollo, etc.) and store the result in properties:
- name: EnrichLead
run_when_succeeded: [StoreLead]
http:
url: https://person.clearbit.com/v2/combined/find?email=a|ValidateBody::email|
headers:
authorization: "Bearer a|ap_var::CLEARBIT_API_KEY|"
Push to a CRMβ
Add an HTTP action to create a contact in HubSpot, Salesforce, or Pipedrive after the lead is stored:
- name: CreateHubSpotContact
run_when_succeeded: [StoreLead]
http:
url: https://api.hubapi.com/crm/v3/objects/contacts
method: POST
headers:
content-type: application/json
authorization: "Bearer a|ap_var::HUBSPOT_API_KEY|"
body: |
{
"properties": {
"email": "a|ValidateBody::email|",
"firstname": "a|ValidateBody::name|",
"company": "a|ValidateBody::company|"
}
}
Notesβ
emailis the only required field. All others are optional.- The
propertiescolumn exists in the schema for arbitrary JSON metadata (UTM params, form context, etc.). To populate it, add it to theparamslist inStoreLeadand passNULLIF($6::text, '')::jsonbin the query. emit_metricincrementsapp_leads_captured_total{source=...}on each capture, giving you a per-source counter in Prometheus.
Configurationβ
config.ymlβ
name: LeadCapture
description: Capture leads from any form or webhook, store them in Postgres, and fire an instant Slack notification.
docs: true
# Required managed variables:
# DATABASE_URL β Postgres connection string
# SLACK_WEBHOOK_URL β Slack incoming webhook URL for notifications
global:
databases:
main:
driver: postgres
conn_string: "a|ap_var::DATABASE_URL|"
interfaces:
# POST /leads/capture
# Accept a lead from a contact form, landing page, or any webhook.
# Body: { "email": "...", "name": "...", "company": "...", "source": "...", "message": "..." }
leads/capture:
output: http
method: POST
summary: Capture lead
description: >
Validate and store an inbound lead, then send a real-time Slack notification.
All fields except email are optional.
tags: [leads]
request_example:
email: [email protected]
name: Jane Smith
company: Startup Inc
source: landing_page
message: "Interested in the enterprise plan"
actions:
- name: ValidateBody
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: email
is_not_null: true
is_not_empty: true
regex: "^[^@]+@[^@]+\\.[^@]+$"
description: "Lead email address"
- name: StoreLead
run_when_succeeded:
actions: [ValidateBody]
http_code_on_error: 400
database: main
query: |
INSERT INTO leads (email, name, company, source, message)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, email, name, company, source, created_at;
params:
- a|ValidateBody::email|
- a|ValidateBody::name|
- a|ValidateBody::company|
- a|ValidateBody::source|
- a|ValidateBody::message|
post_transforms:
- extract_value: "[0]"
- name: NotifySlack
run_when_succeeded: [StoreLead]
http:
url: a|ap_var::SLACK_WEBHOOK_URL|
method: POST
headers:
content-type: application/json
# User-supplied fields are ->json_escape'd so quotes/newlines in a lead
# don't break the JSON sent to Slack.
body: |
{
"text": "π― New lead: *a|ValidateBody::name->json_escape|* (a|ValidateBody::email->json_escape|)",
"attachments": [
{
"color": "#2196F3",
"fields": [
{ "title": "Company", "value": "a|ValidateBody::company->json_escape|", "short": true },
{ "title": "Source", "value": "a|ValidateBody::source->json_escape|", "short": true },
{ "title": "Message", "value": "a|ValidateBody::message->json_escape|", "short": false }
]
}
]
}
- name: TrackLead
run_when_succeeded: [StoreLead]
hide_data_on_success: true
emit_metric:
name: app_leads_captured_total
type: counter
labels:
source: a|ValidateBody::source|
# GET /leads
# All leads, most recent first.
leads:
output: http
summary: List leads
description: All captured leads, most recent first.
tags: [leads]
response_example:
- id: 1
email: [email protected]
name: Jane Smith
company: Startup Inc
source: landing_page
created_at: "2024-01-01T12:00:00Z"
actions:
- name: ListLeads
database: main
query: |
SELECT id, email, name, company, source, message, created_at
FROM leads
ORDER BY created_at DESC
LIMIT 500;
# POST /leads/get
# Fetch a single lead by ID.
# Body: { "id": 1 }
leads/get:
output: http
method: POST
summary: Get lead
description: Fetch a single lead by ID.
tags: [leads]
request_example:
id: 1
actions:
- name: ValidateBody
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: id
is_not_null: true
- name: GetLead
run_when_succeeded:
actions: [ValidateBody]
http_code_on_error: 400
database: main
query: |
SELECT id, email, name, company, source, message, properties, created_at
FROM leads
WHERE id = $1;
params:
- a|ValidateBody::id|
assert:
http_code_on_error: 404
error_message: "Lead not found"
tests:
- value: count()
is_equal_to: 1
post_transforms:
- extract_value: "[0]"
# POST /leads/seed
# Creates the leads table and indexes (idempotent) then loads sample leads
# from several sources. Safe to re-run β truncates before inserting.
leads/seed:
output: http
method: POST
actions:
- name: CreateLeadsTable
database: main
hide_data_on_success: true
query: |
CREATE TABLE IF NOT EXISTS leads (
id BIGSERIAL PRIMARY KEY,
email TEXT NOT NULL,
name TEXT,
company TEXT,
source TEXT,
message TEXT,
properties JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
- name: CreateIndexes
run_when_succeeded: [CreateLeadsTable]
database: main
hide_data_on_success: true
query: |
CREATE INDEX IF NOT EXISTS idx_leads_email ON leads (email);
CREATE INDEX IF NOT EXISTS idx_leads_source ON leads (source);
CREATE INDEX IF NOT EXISTS idx_leads_created_at ON leads (created_at DESC);
- name: TruncateLeads
run_when_succeeded: [CreateIndexes]
database: main
hide_data_on_success: true
query: |
TRUNCATE leads RESTART IDENTITY;
- name: InsertLeads
run_when_succeeded: [TruncateLeads]
database: main
hide_data_on_success: true
query: |
INSERT INTO leads (email, name, company, source, message) VALUES
('[email protected]', 'Alice Nguyen', 'Startup Inc', 'landing_page', 'Interested in the growth plan'),
('[email protected]', 'Bob Patel', 'Acme Corp', 'product_hunt', 'Saw you on PH β looks great'),
('[email protected]', 'Carol Smith', NULL, 'referral', 'My colleague recommended you'),
('[email protected]', 'Dave Okafor', 'BigCo Ltd', 'webinar', 'Attended your onboarding webinar'),
('[email protected]', 'Eve Torres', 'Creative Agency','landing_page', NULL),
('[email protected]', 'Frank MΓΌller', 'SaaS Tools GmbH','product_hunt', 'Looking for an API gateway'),
('[email protected]', 'Grace Lee', 'EcomStore', 'google_ads', 'Found you via search'),
('[email protected]', 'Hank Johansson', NULL, 'referral', NULL);
- name: SeedSummary
run_when_succeeded: [InsertLeads]
database: main
query: |
SELECT COUNT(*) AS leads_inserted FROM leads;
post_transforms:
- extract_value: "[0]"
# GET /leads/summary
# Count of leads grouped by source.
leads/summary:
output: http
summary: Leads by source
description: Count of captured leads grouped by source.
tags: [leads]
response_example:
- source: landing_page
count: 142
- source: product_hunt
count: 38
actions:
- name: SummaryBySource
database: main
query: |
SELECT source, COUNT(*) AS count
FROM leads
GROUP BY source
ORDER BY count DESC;
schema.sqlβ
CREATE TABLE IF NOT EXISTS leads (
id BIGSERIAL PRIMARY KEY,
email TEXT NOT NULL,
name TEXT,
company TEXT,
source TEXT,
message TEXT,
properties JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_leads_email ON leads (email);
CREATE INDEX IF NOT EXISTS idx_leads_source ON leads (source);
CREATE INDEX IF NOT EXISTS idx_leads_created_at ON leads (created_at DESC);