Skip to main content

RAG Chatbot

Category: AI & Agents

Get this pack →

This page is generated from the Air Pipe marketplace. Browse it live to install into your organization.

Retrieval-Augmented Generation chatbot backed by Postgres + pgvector. Ingest your own documents, store OpenAI embeddings alongside them, and answer questions using only your data — no third-party vector DB, no proprietary cloud, nothing leaves your infrastructure.


What's included

FilePurpose
config.ymlAirPipe config — all endpoints
schema.sqlPostgres tables + pgvector index

Endpoints

MethodPathDescription
POST/rag/documents/seedCreate tables + load 3 sample docs (requires OpenAI key)
GET/rag/documentsList ingested documents
POST/rag/documents/ingestAdd a document (generates embedding)
POST/rag/documents/deleteRemove a document by ID
POST/rag/chatAsk a question — returns a grounded answer
POST/rag/historyRetrieve conversation history for a session

How the RAG pipeline works

Question


text-embedding-ada-002 ──► question vector (1536 dims)


pgvector cosine search ──► top-K similar documents


Build context string (string_agg of titles + content)


gpt-4o-mini ◄── system prompt: "answer using ONLY this context"


{ "answer": "...", "sources_found": 3 }

The retrieval step uses exact cosine search (a sequential scan) by default — correct and fast up to tens of thousands of rows, and it never silently drops relevant documents. Only documents with cosine similarity above 0.6 are included in the context — this threshold filters noise without raising it high enough to miss relevant content. Adjust it in the RetrieveContext action if your corpus needs a different cutoff. For large corpora, add an approximate index — see Scaling retrieval below.


Setup

1. Enable pgvector on your Postgres instance

Supabase / Neon / Tembo — enable via the dashboard or:

CREATE EXTENSION IF NOT EXISTS vector;

Self-hosted Postgres — install the pgvector OS package first:

# Ubuntu/Debian
apt install postgresql-16-pgvector

# Then in psql:
CREATE EXTENSION vector;

2. Run the schema

psql $DATABASE_URL -f schema.sql

3. Set managed variables

NameValue
DATABASE_URLPostgres connection string
OPENAI_API_KEYOpenAI API key with access to text-embedding-ada-002 and gpt-4o-mini

4. Deploy the config

Upload config.yml via the AirPipe dashboard.

5. Seed sample documents

curl -X POST https://your-airpipe-host/rag/documents/seed

This loads three AirPipe documentation excerpts with real embeddings so you can try /rag/chat immediately. Seed takes a few seconds — it makes three calls to the OpenAI embeddings API.


Usage

Ingest a document

curl -X POST https://your-airpipe-host/rag/documents/ingest \
-H 'Content-Type: application/json' \
-d '{
"title": "Refund policy",
"content": "Customers may request a full refund within 30 days of purchase. Contact [email protected] with your order number.",
"source": "support-docs"
}'

Ask a question

curl -X POST https://your-airpipe-host/rag/chat \
-H 'Content-Type: application/json' \
-d '{
"question": "How do I get a refund?",
"session_id": "sess_abc123",
"top_k": 5
}'

Response:

{
"answer": "You can request a full refund within 30 days of purchase by contacting [email protected] with your order number.",
"sources_found": 1
}

session_id is optional. When provided, every question and answer is stored in rag_chat_history so you can retrieve the conversation later. Omit it for stateless one-shot queries.

Retrieve history

curl -X POST https://your-airpipe-host/rag/history \
-H 'Content-Type: application/json' \
-d '{ "session_id": "sess_abc123" }'

Ingesting your own documents

Replace the seed content with your own docs by calling /rag/documents/ingest for each chunk. Tips for chunking:

  • Aim for 256–512 tokens per chunk. Longer chunks dilute the embedding signal; shorter chunks lose context.
  • Chunk on natural boundaries — paragraphs or headings — not arbitrary character counts.
  • For large documents, overlap adjacent chunks by ~50 tokens so retrieval doesn't miss split sentences.
  • Store a meaningful source field (e.g. "support-docs", "product-manual") so you can filter or attribute later.

Swapping the embedding model

text-embedding-ada-002 outputs 1536 dimensions. If you switch to a model with different dimensions (e.g. text-embedding-3-small can be truncated to 512 or 1536), update the vector(1536) column type and recreate the index:

ALTER TABLE rag_documents ALTER COLUMN embedding TYPE vector(512);
DROP INDEX idx_rag_docs_embedding;
CREATE INDEX idx_rag_docs_embedding
ON rag_documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

You will also need to re-ingest all documents since existing embeddings will be incompatible.


Swapping the chat model

Change "gpt-4o-mini" to "gpt-4o" in the GenerateAnswer action body for higher quality answers at higher cost. The prompt and pipeline are otherwise identical.


Notes

  • The similarity threshold of 0.6 in RetrieveContext is a starting point. Lower it (e.g. 0.5) if your corpus is sparse and you're getting "answer not in context" responses. Raise it (e.g. 0.75) if retrieval is returning irrelevant chunks.
  • Scaling retrieval. The pack uses exact search by default (no ANN index) so the seed/demo and small corpora always return correct results. Once you have a large corpus, add an approximate index and probe several lists at query time:
    CREATE INDEX idx_rag_docs_embedding ON rag_documents
    USING ivfflat (embedding vector_cosine_ops) WITH (lists = <~sqrt(row_count)>);
    -- then per query/session: SET ivfflat.probes = <10..lists>; -- probes = 1 (the default) drops lists
    For very large corpora (>1M rows), use an HNSW index instead. Don't add a small ivfflat index over a tiny table — with probes = 1 it skips lists and can return zero sources for a relevant question.
  • sources_found = 0 means no documents passed the similarity threshold. Either the question is outside your corpus or the threshold is too high.

Configuration

config.yml

name: RagChatbot
description: >
Retrieval-Augmented Generation chatbot backed by Postgres + pgvector.
Ingest your own documents, store embeddings via OpenAI, and answer
questions using only your data — no third-party vector DB required.

docs: true

# Required managed variables:
# OPENAI_API_KEY — OpenAI API key (embeddings + chat completions)
# DATABASE_URL — Postgres connection string (pgvector extension must be enabled)
#
# Models used (override in GenerateAnswer / embedding calls as needed):
# Embeddings : text-embedding-ada-002 (1536 dimensions — must match schema)
# Chat : gpt-4o-mini (swap to gpt-4o for higher quality)

global:
databases:
main:
driver: postgres
conn_string: "a|ap_var::DATABASE_URL|"

interfaces:

# POST /rag/documents/seed
# Creates tables (idempotent) and loads three sample support-docs with
# real embeddings so the /rag/chat endpoint works immediately after seeding.
# Safe to re-run — truncates rag_documents before inserting.
rag/documents/seed:
output: http
method: POST

actions:
- name: CreateTables
database: main
hide_data_on_success: true
query: |
CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE IF NOT EXISTS rag_documents (
id BIGSERIAL PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL,
source TEXT,
embedding vector(1536),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- No ANN index by default: with a small corpus an ivfflat index
-- (probes = 1) skips lists and returns incomplete results, so the
-- seed/demo "ask immediately" flow can return zero sources. Exact
-- search (a sequential scan) is correct and fast up to tens of
-- thousands of rows. For larger corpora add an index — see the README.
CREATE INDEX IF NOT EXISTS idx_rag_docs_source ON rag_documents (source);

CREATE TABLE IF NOT EXISTS rag_chat_history (
id BIGSERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_rag_chat_session
ON rag_chat_history (session_id, created_at ASC);

- name: TruncateDocuments
run_when_succeeded: [CreateTables]
database: main
hide_data_on_success: true
query: |
TRUNCATE rag_documents RESTART IDENTITY;

# Document 1 — embed then store
- name: EmbedDoc1
run_when_succeeded: [TruncateDocuments]
http:
url: https://api.openai.com/v1/embeddings
method: POST
headers:
content-type: application/json
authorization: "Bearer a|ap_var::OPENAI_API_KEY|"
body: |
{
"model": "text-embedding-ada-002",
"input": "Getting started with AirPipe: AirPipe is a no-code backend platform that lets you build REST APIs, webhook handlers, and scheduled jobs using YAML configuration files. Connect it to your Postgres database and deploy in minutes with no servers to manage."
}
assert:
http_code_on_error: 502
error_message: "OpenAI embeddings API error"
tests:
- value: status
is_equal_to: 200

- name: StoreDoc1
run_when_succeeded: [EmbedDoc1]
database: main
hide_data_on_success: true
query: |
INSERT INTO rag_documents (title, content, source, embedding) VALUES (
'Getting started with AirPipe',
'AirPipe is a no-code backend platform that lets you build REST APIs, webhook handlers, and scheduled jobs using YAML configuration files. Connect it to your Postgres database and deploy in minutes with no servers to manage.',
'docs',
$1::vector
);
params:
- a|EmbedDoc1::body.data|[0].embedding|

# Document 2
- name: EmbedDoc2
run_when_succeeded: [StoreDoc1]
http:
url: https://api.openai.com/v1/embeddings
method: POST
headers:
content-type: application/json
authorization: "Bearer a|ap_var::OPENAI_API_KEY|"
body: |
{
"model": "text-embedding-ada-002",
"input": "Managed variables in AirPipe: Managed variables are encrypted key-value pairs stored securely in the AirPipe platform. Reference them in your config using the ap_var::VARIABLE_NAME interpolation syntax. They are never logged and are injected at runtime. Use them for API keys, database URLs, and other secrets."
}
assert:
http_code_on_error: 502
error_message: "OpenAI embeddings API error"
tests:
- value: status
is_equal_to: 200

- name: StoreDoc2
run_when_succeeded: [EmbedDoc2]
database: main
hide_data_on_success: true
query: |
INSERT INTO rag_documents (title, content, source, embedding) VALUES (
'Managed variables',
'Managed variables are encrypted key-value pairs stored securely in the AirPipe platform. Reference them in your config using the ap_var::VARIABLE_NAME interpolation syntax. They are never logged and are injected at runtime. Use them for API keys, database URLs, and other secrets.',
'docs',
$1::vector
);
params:
- a|EmbedDoc2::body.data|[0].embedding|

# Document 3
- name: EmbedDoc3
run_when_succeeded: [StoreDoc2]
http:
url: https://api.openai.com/v1/embeddings
method: POST
headers:
content-type: application/json
authorization: "Bearer a|ap_var::OPENAI_API_KEY|"
body: |
{
"model": "text-embedding-ada-002",
"input": "Webhook signature verification in AirPipe: Use the is_valid_hmac assertion to verify webhook signatures from Stripe, GitHub, and other providers. Specify the secret variable, the signed body using interpolation, and the algorithm (sha256). The engine resolves all interpolations before hashing so you can compose multi-part signed payloads inline."
}
assert:
http_code_on_error: 502
error_message: "OpenAI embeddings API error"
tests:
- value: status
is_equal_to: 200

- name: StoreDoc3
run_when_succeeded: [EmbedDoc3]
database: main
hide_data_on_success: true
query: |
INSERT INTO rag_documents (title, content, source, embedding) VALUES (
'Webhook signature verification',
'Use the is_valid_hmac assertion to verify webhook signatures from Stripe, GitHub, and other providers. Specify the secret variable, the signed body using interpolation, and the algorithm (sha256). The engine resolves all interpolations before hashing so you can compose multi-part signed payloads inline.',
'docs',
$1::vector
);
params:
- a|EmbedDoc3::body.data|[0].embedding|

- name: SeedSummary
run_when_succeeded: [StoreDoc3]
database: main
query: |
SELECT COUNT(*) AS documents_seeded FROM rag_documents;
post_transforms:
- extract_value: "[0]"

# GET /rag/documents
# List all ingested documents. Embeddings are omitted — they are large and
# not useful for display.
rag/documents:
output: http
summary: List documents
description: Returns all ingested documents without their embedding vectors.
tags: [rag, documents]
response_example:
- id: 1
title: Getting started with AirPipe
source: docs
created_at: "2024-01-01T12:00:00Z"

actions:
- name: ListDocuments
database: main
query: |
SELECT id, title, source, created_at
FROM rag_documents
ORDER BY created_at DESC;

# POST /rag/documents/ingest
# Generate an embedding for the provided content and store the document.
# Body: { "title": "...", "content": "...", "source": "..." }
rag/documents/ingest:
output: http
method: POST
summary: Ingest document
description: >
Generate an OpenAI embedding for the provided content and store the
document in Postgres for retrieval. The source field is optional and
useful for filtering (e.g. "support-docs", "product-manual").
tags: [rag, documents]
request_example:
title: Refund policy
content: "Customers may request a full refund within 30 days of purchase. To initiate a refund, contact [email protected] with your order number."
source: support-docs

actions:
- name: ValidateBody
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: title
is_not_null: true
is_not_empty: true
description: "Document title"
- value: content
is_not_null: true
is_not_empty: true
description: "Document text to embed and store"

- name: GenerateEmbedding
run_when_succeeded:
actions: [ValidateBody]
http_code_on_error: 400
http:
url: https://api.openai.com/v1/embeddings
method: POST
headers:
content-type: application/json
authorization: "Bearer a|ap_var::OPENAI_API_KEY|"
body: |
{
"model": "text-embedding-ada-002",
"input": a|double_quote(ValidateBody::content)|
}
assert:
http_code_on_error: 502
error_message: "Embedding generation failed"
tests:
- value: status
is_equal_to: 200

- name: StoreDocument
run_when_succeeded: [GenerateEmbedding]
database: main
query: |
INSERT INTO rag_documents (title, content, source, embedding)
VALUES ($1, $2, $3, $4::vector)
RETURNING id, title, source, created_at;
params:
- a|ValidateBody::title|
- a|ValidateBody::content|
- a|ValidateBody::source|
- a|GenerateEmbedding::body.data|[0].embedding|
post_transforms:
- extract_value: "[0]"

- name: TrackIngest
run_when_succeeded: [StoreDocument]
hide_data_on_success: true
emit_metric:
name: app_rag_documents_ingested_total
type: counter
labels:
source: a|ValidateBody::source|

# POST /rag/documents/delete
# Remove a document by ID.
# Body: { "id": 1 }
rag/documents/delete:
output: http
method: POST
summary: Delete document
description: Permanently remove an ingested document by ID.
tags: [rag, documents]
request_example:
id: 1

actions:
- name: ValidateBody
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: id
is_not_null: true
description: "Document ID"

- name: DeleteDocument
run_when_succeeded:
actions: [ValidateBody]
http_code_on_error: 400
database: main
query: |
DELETE FROM rag_documents WHERE id = $1 RETURNING id, title;
params:
- a|ValidateBody::id|
assert:
http_code_on_error: 404
error_message: "Document not found"
tests:
- value: count()
is_equal_to: 1
post_transforms:
- extract_value: "[0]"

# POST /rag/chat
# Answer a question using retrieval-augmented generation.
# Body: { "question": "...", "session_id": "...", "top_k": 5 }
#
# Pipeline:
# 1. Embed the question with text-embedding-ada-002
# 2. Find the top_k most similar documents using cosine distance
# 3. Build a grounded context string from retrieved docs
# 4. Call gpt-4o-mini with the context as the system prompt
# 5. Store Q+A in rag_chat_history (when session_id is provided)
# 6. Return a clean { answer, sources_found } response
rag/chat:
output: http
method: POST
summary: Chat
description: >
Embed the question, retrieve the most semantically similar documents,
and generate a grounded answer using GPT. Only uses context from your
ingested documents — the model is instructed not to guess beyond them.
tags: [rag, chat]
request_example:
question: "How do I verify a Stripe webhook signature?"
session_id: sess_abc123
top_k: 5
response_example:
answer: "Use the is_valid_hmac assertion in your AirPipe config..."
sources_found: 2
notes: |
`session_id` stores each turn in `rag_chat_history`. Omit it for
stateless one-shot queries. `top_k` defaults to 5 if not provided.

actions:
- name: ValidateBody
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: question
is_not_null: true
is_not_empty: true
description: "The question to answer"

- name: EmbedQuestion
run_when_succeeded:
actions: [ValidateBody]
http_code_on_error: 400
http:
url: https://api.openai.com/v1/embeddings
method: POST
headers:
content-type: application/json
authorization: "Bearer a|ap_var::OPENAI_API_KEY|"
body: |
{
"model": "text-embedding-ada-002",
"input": a|double_quote(ValidateBody::question)|
}
assert:
http_code_on_error: 502
error_message: "Failed to embed question"
tests:
- value: status
is_equal_to: 200

# Retrieve the top_k most similar documents and aggregate them into a
# single context string. Only documents with cosine similarity > 0.6
# are included to avoid injecting irrelevant noise into the prompt.
- name: RetrieveContext
run_when_succeeded: [EmbedQuestion]
database: main
query: |
SELECT
COALESCE(
string_agg(
'Document: ' || title || E'\n' || content,
E'\n\n---\n\n'
ORDER BY similarity DESC
),
''
) AS context,
COUNT(*) AS sources_found
FROM (
SELECT
title,
content,
1 - (embedding <=> $1::vector) AS similarity
FROM rag_documents
ORDER BY embedding <=> $1::vector
LIMIT COALESCE($2::int, 5)
) ranked
WHERE similarity > 0.6;
params:
- a|EmbedQuestion::body.data|[0].embedding|
- a|ValidateBody::top_k|
post_transforms:
- extract_value: "[0]"

- name: GenerateAnswer
run_when_succeeded: [RetrieveContext]
http:
url: https://api.openai.com/v1/chat/completions
method: POST
headers:
content-type: application/json
authorization: "Bearer a|ap_var::OPENAI_API_KEY|"
body: |
{
"model": "gpt-4o-mini",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant. Answer the user's question using ONLY the context provided below. If the answer is not covered by the context, say so clearly — do not guess or draw on outside knowledge.\n\nContext:\na|RetrieveContext::context->json_escape|"
},
{
"role": "user",
"content": a|double_quote(ValidateBody::question)|
}
]
}
assert:
http_code_on_error: 502
error_message: "Answer generation failed"
tests:
- value: status
is_equal_to: 200

# Extract the answer text from the OpenAI response using a SQL JSON
# operator so the final response is clean rather than the full API envelope.
- name: ParseAnswer
run_when_succeeded: [GenerateAnswer]
database: main
hide_data_on_success: true
query: |
SELECT ($1::jsonb)->'choices'->0->'message'->>'content' AS answer;
params:
- a|GenerateAnswer::body|
post_transforms:
- extract_value: "[0]"

# Store both sides of the turn when a session_id is provided.
- name: StoreHistory
run_when_succeeded: [ParseAnswer]
run_on_assertion:
tests:
- action: ValidateBody
value: session_id
is_not_null: true
database: main
hide_data_on_success: true
query: |
INSERT INTO rag_chat_history (session_id, role, content)
VALUES
($1, 'user', $2),
($1, 'assistant', $3);
params:
- a|ValidateBody::session_id|
- a|ValidateBody::question|
- a|ParseAnswer::answer|

- name: TrackQuery
run_when_succeeded: [ParseAnswer]
hide_data_on_success: true
emit_metric:
name: app_rag_queries_total
type: counter

- name: BuildResponse
run_when_succeeded: [ParseAnswer]
database: main
query: |
SELECT $1::text AS answer, $2::int AS sources_found;
params:
- a|ParseAnswer::answer|
- a|RetrieveContext::sources_found|
post_transforms:
- extract_value: "[0]"

# POST /rag/history
# Retrieve conversation history for a session.
# Body: { "session_id": "sess_abc123" }
rag/history:
output: http
method: POST
summary: Chat history
description: Retrieve all messages for a session ID, oldest first.
tags: [rag, chat]
request_example:
session_id: sess_abc123
response_example:
- id: 1
role: user
content: "How do I verify a Stripe webhook signature?"
created_at: "2024-01-01T12:00:00Z"
- id: 2
role: assistant
content: "Use the is_valid_hmac assertion..."
created_at: "2024-01-01T12:00:01Z"

actions:
- name: ValidateBody
input: a|body|
hide_data_on_success: true
assert:
http_code_on_error: 400
tests:
- value: session_id
is_not_null: true
description: "Session ID"

- name: GetHistory
run_when_succeeded:
actions: [ValidateBody]
http_code_on_error: 400
database: main
query: |
SELECT id, role, content, created_at
FROM rag_chat_history
WHERE session_id = $1
ORDER BY created_at ASC;
params:
- a|ValidateBody::session_id|

schema.sql

-- pgvector must be enabled on your Postgres instance before running this.
-- Most managed providers (Supabase, Neon, Tembo) enable it via:
-- CREATE EXTENSION IF NOT EXISTS vector;
-- For self-hosted Postgres, install the pgvector package first.

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE IF NOT EXISTS rag_documents (
id BIGSERIAL PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL,
source TEXT,
embedding vector(1536), -- text-embedding-ada-002 dimensions
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- No ANN index by default. Exact (sequential-scan) cosine search is correct and
-- fast up to tens of thousands of rows, and — unlike a small ivfflat index with
-- probes = 1 — never silently drops relevant rows. Once you have a large corpus,
-- add an approximate index and tune it (see README "Scaling retrieval"):
-- CREATE INDEX idx_rag_docs_embedding ON rag_documents
-- USING ivfflat (embedding vector_cosine_ops) WITH (lists = <~sqrt(rows)>);
-- -- and at query time: SET ivfflat.probes = <10..lists>;
-- or use an HNSW index for very large corpora.

CREATE INDEX IF NOT EXISTS idx_rag_docs_source ON rag_documents (source);
CREATE INDEX IF NOT EXISTS idx_rag_docs_created_at ON rag_documents (created_at DESC);

CREATE TABLE IF NOT EXISTS rag_chat_history (
id BIGSERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_rag_chat_session
ON rag_chat_history (session_id, created_at ASC);