n8n Database Integration Patterns: PostgreSQL, MongoDB, and Beyond
Database integrations are the backbone of most automation workflows. This guide covers production-ready patterns for integrating n8n with PostgreSQL, MongoDB, Redis, and other databases.
PostgreSQL Integration Patterns
Basic Connection Setup
// PostgreSQL node configuration
{
"operation": "executeQuery",
"query": "SELECT * FROM users WHERE created_at > $1",
"parameters": "={{ [$json.since_date] }}",
"options": {
"queryTimeout": 30000,
"connectionTimeout": 10000
}
}Parameterized Queries (Preventing SQL Injection)
Always use parameterized queries to prevent SQL injection:
// Safe: Parameterized query
{
"operation": "executeQuery",
"query": `
INSERT INTO audit_log (user_id, action, resource, timestamp)
VALUES ($1, $2, $3, $4)
RETURNING id
`,
"parameters": "={{ [$json.userId, $json.action, $json.resource, new Date().toISOString()] }}"
}
// UNSAFE: String interpolation - NEVER DO THIS
{
"query": `SELECT * FROM users WHERE email = '${$json.email}'` // SQL Injection vulnerability!
}Batch Insert Pattern
For inserting multiple records efficiently:
// Function node to prepare batch insert
const records = $input.all();
const values = [];
const params = [];
records.forEach((item, index) => {
const offset = index * 4;
values.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4})`);
params.push(
item.json.name,
item.json.email,
item.json.role,
new Date().toISOString()
);
});
return [{
json: {
query: `
INSERT INTO users (name, email, role, created_at)
VALUES ${values.join(', ')}
ON CONFLICT (email) DO UPDATE SET
name = EXCLUDED.name,
role = EXCLUDED.role,
updated_at = NOW()
RETURNING *
`,
parameters: params
}
}];Transaction Pattern
Implementing transactions in n8n:
// Start transaction
{
"operation": "executeQuery",
"query": "BEGIN"
}
// Execute operations
{
"operation": "executeQuery",
"query": "INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING id",
"parameters": "={{ [$json.userId, $json.total] }}"
}
{
"operation": "executeQuery",
"query": "INSERT INTO order_items (order_id, product_id, quantity) VALUES ($1, $2, $3)",
"parameters": "={{ [$json.orderId, $json.productId, $json.quantity] }}"
}
// Commit or rollback based on success
// In error handler:
{
"operation": "executeQuery",
"query": "ROLLBACK"
}
// On success:
{
"operation": "executeQuery",
"query": "COMMIT"
}Connection Pooling with Function Node
// Function node for connection pool management
const { Pool } = require('pg');
// Connection pool configuration
const poolConfig = {
host: $env.PG_HOST,
port: $env.PG_PORT,
database: $env.PG_DATABASE,
user: $env.PG_USER,
password: $env.PG_PASSWORD,
max: 20, // Maximum pool size
idleTimeoutMillis: 30000, // Close idle connections after 30s
connectionTimeoutMillis: 10000,
maxUses: 7500 // Close connection after 7500 queries
};
// Get pool from workflow static data or create new
let pool = $getWorkflowStaticData('global').pgPool;
if (!pool) {
pool = new Pool(poolConfig);
pool.on('error', (err) => {
console.error('Unexpected error on idle client', err);
});
$getWorkflowStaticData('global').pgPool = pool;
}
// Execute query with automatic connection management
const client = await pool.connect();
try {
const result = await client.query(
'SELECT * FROM users WHERE status = $1 LIMIT $2',
[$json.status, $json.limit || 100]
);
return result.rows.map(row => ({ json: row }));
} finally {
client.release();
}MongoDB Integration Patterns
Basic CRUD Operations
// Insert document
{
"operation": "insert",
"collection": "users",
"document": "={{ $json }}"
}
// Find with query
{
"operation": "find",
"collection": "users",
"query": {
"status": "active",
"created_at": { "$gte": "={{ $json.since }}" }
},
"options": {
"limit": 100,
"sort": { "created_at": -1 }
}
}
// Update with operators
{
"operation": "update",
"collection": "users",
"query": { "_id": "={{ $json.userId }}" },
"update": {
"$set": {
"last_login": "={{ new Date().toISOString() }}",
"login_count": "={{ $json.loginCount + 1 }}"
},
"$push": {
"login_history": {
"timestamp": "={{ new Date().toISOString() }}",
"ip": "={{ $json.ipAddress }}"
}
}
}
}Aggregation Pipeline
// Function node for complex aggregation
const pipeline = [
// Match stage
{
$match: {
created_at: {
$gte: new Date($json.startDate),
$lte: new Date($json.endDate)
},
status: 'completed'
}
},
// Group stage
{
$group: {
_id: {
year: { $year: '$created_at' },
month: { $month: '$created_at' },
day: { $dayOfMonth: '$created_at' }
},
total_orders: { $sum: 1 },
total_revenue: { $sum: '$amount' },
avg_order_value: { $avg: '$amount' },
unique_customers: { $addToSet: '$customer_id' }
}
},
// Project stage
{
$project: {
_id: 0,
date: {
$dateFromParts: {
year: '$_id.year',
month: '$_id.month',
day: '$_id.day'
}
},
total_orders: 1,
total_revenue: { $round: ['$total_revenue', 2] },
avg_order_value: { $round: ['$avg_order_value', 2] },
unique_customers: { $size: '$unique_customers' }
}
},
// Sort by date
{ $sort: { date: 1 } }
];
return [{ json: { pipeline } }];Bulk Write Operations
// Function node for bulk operations
const operations = $input.all().map(item => {
const data = item.json;
switch (data.operation) {
case 'insert':
return {
insertOne: {
document: data.document
}
};
case 'update':
return {
updateOne: {
filter: { _id: data.id },
update: { $set: data.updates },
upsert: data.upsert || false
}
};
case 'delete':
return {
deleteOne: {
filter: { _id: data.id }
}
};
default:
throw new Error(`Unknown operation: ${data.operation}`);
}
});
return [{
json: {
operations,
options: {
ordered: false // Continue on error
}
}
}];Change Streams for Real-time Updates
// Function node for change stream listener
const { MongoClient } = require('mongodb');
const client = new MongoClient($env.MONGODB_URI);
await client.connect();
const collection = client.db('mydb').collection('orders');
const changeStream = collection.watch([
{
$match: {
'operationType': { $in: ['insert', 'update'] },
'fullDocument.status': 'pending'
}
}
]);
// Process change events
const changes = [];
const timeout = setTimeout(() => {
changeStream.close();
}, 30000); // 30 second timeout
changeStream.on('change', (change) => {
changes.push({
operation: change.operationType,
document: change.fullDocument,
timestamp: change.clusterTime
});
if (changes.length >= 10) { // Batch size
clearTimeout(timeout);
changeStream.close();
}
});
await new Promise(resolve => {
changeStream.on('close', resolve);
});
await client.close();
return changes.map(c => ({ json: c }));Redis Integration Patterns
Caching Pattern
// Function node for cache-aside pattern
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
const cacheKey = `user:${$json.userId}:profile`;
const cacheTTL = 3600; // 1 hour
// Try to get from cache
let data = await redis.get(cacheKey);
if (data) {
// Cache hit
return [{
json: {
source: 'cache',
data: JSON.parse(data)
}
}];
}
// Cache miss - data will be fetched by next node
// Return flag to indicate cache miss
return [{
json: {
source: 'database',
cacheKey,
cacheTTL,
shouldCache: true
}
}];
// After database fetch, cache the result:
// await redis.setex(cacheKey, cacheTTL, JSON.stringify(data));Rate Limiting with Redis
// Function node for sliding window rate limit
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
const userId = $json.userId;
const windowMs = 60000; // 1 minute window
const maxRequests = 100;
const key = `ratelimit:${userId}`;
const now = Date.now();
const windowStart = now - windowMs;
// Use Redis transaction for atomic operations
const multi = redis.multi();
multi.zremrangebyscore(key, '-inf', windowStart); // Remove old entries
multi.zadd(key, now, `${now}-${Math.random()}`); // Add current request
multi.zcard(key); // Count requests in window
multi.expire(key, Math.ceil(windowMs / 1000)); // Set TTL
const results = await multi.exec();
const requestCount = results[2][1];
if (requestCount > maxRequests) {
return [{
json: {
allowed: false,
remaining: 0,
retryAfter: Math.ceil((windowStart + windowMs - now) / 1000)
}
}];
}
return [{
json: {
allowed: true,
remaining: maxRequests - requestCount,
limit: maxRequests
}
}];Distributed Lock Pattern
// Function node for distributed locking
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
const crypto = require('crypto');
const lockKey = `lock:${$json.resourceId}`;
const lockValue = crypto.randomUUID();
const lockTTL = 30; // 30 seconds
// Try to acquire lock
const acquired = await redis.set(
lockKey,
lockValue,
'EX', lockTTL,
'NX'
);
if (!acquired) {
return [{
json: {
lockAcquired: false,
error: 'Resource is locked by another process'
}
}];
}
// Return lock info for later release
return [{
json: {
lockAcquired: true,
lockKey,
lockValue,
expiresIn: lockTTL
}
}];
// To release lock (in finally block):
// Use Lua script for atomic check-and-delete
const releaseLockScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
// await redis.eval(releaseLockScript, 1, lockKey, lockValue);Pub/Sub Pattern
// Publisher function node
const Redis = require('ioredis');
const publisher = new Redis($env.REDIS_URL);
const channel = `events:${$json.eventType}`;
const message = JSON.stringify({
id: $json.eventId,
type: $json.eventType,
data: $json.data,
timestamp: new Date().toISOString()
});
await publisher.publish(channel, message);
await publisher.quit();
return [{
json: {
published: true,
channel,
messageSize: message.length
}
}];
// Subscriber (separate workflow with webhook trigger)
const Redis = require('ioredis');
const subscriber = new Redis($env.REDIS_URL);
const messages = [];
const channels = ['events:order', 'events:payment', 'events:shipment'];
subscriber.subscribe(...channels);
subscriber.on('message', (channel, message) => {
messages.push({
channel,
message: JSON.parse(message),
receivedAt: new Date().toISOString()
});
});
// Collect messages for 5 seconds
await new Promise(resolve => setTimeout(resolve, 5000));
await subscriber.quit();
return messages.map(m => ({ json: m }));Multi-Database Patterns
Data Synchronization Workflow
// Sync data from PostgreSQL to MongoDB
// Step 1: Fetch from PostgreSQL
{
"operation": "executeQuery",
"query": `
SELECT * FROM products
WHERE updated_at > $1
ORDER BY updated_at
LIMIT 1000
`,
"parameters": "={{ [$json.lastSyncTimestamp] }}"
}
// Step 2: Transform data (Function node)
const products = $input.all();
return products.map(item => ({
json: {
filter: { postgres_id: item.json.id },
update: {
$set: {
postgres_id: item.json.id,
name: item.json.name,
description: item.json.description,
price: parseFloat(item.json.price),
category: item.json.category,
metadata: {
source: 'postgresql',
synced_at: new Date().toISOString(),
source_updated_at: item.json.updated_at
}
}
},
upsert: true
}
}));
// Step 3: Bulk upsert to MongoDB
{
"operation": "bulkWrite",
"collection": "products",
"operations": "={{ $json.operations }}"
}Cross-Database Transaction Pattern
// Saga pattern for cross-database consistency
// Function node implementing compensating transactions
class SagaOrchestrator {
constructor() {
this.steps = [];
this.completedSteps = [];
}
addStep(name, execute, compensate) {
this.steps.push({ name, execute, compensate });
}
async run(context) {
try {
for (const step of this.steps) {
const result = await step.execute(context);
this.completedSteps.push({
name: step.name,
compensate: step.compensate,
result
});
context = { ...context, [step.name]: result };
}
return { success: true, context };
} catch (error) {
// Compensate in reverse order
for (const completed of this.completedSteps.reverse()) {
try {
await completed.compensate(context, completed.result);
} catch (compensateError) {
console.error(`Compensation failed for ${completed.name}:`, compensateError);
}
}
return { success: false, error: error.message };
}
}
}
// Usage
const saga = new SagaOrchestrator();
saga.addStep(
'createOrder',
async (ctx) => {
// Insert into PostgreSQL
return { orderId: 'order_123' };
},
async (ctx, result) => {
// Delete order from PostgreSQL
}
);
saga.addStep(
'reserveInventory',
async (ctx) => {
// Update inventory in MongoDB
return { reserved: true };
},
async (ctx, result) => {
// Release inventory in MongoDB
}
);
saga.addStep(
'processPayment',
async (ctx) => {
// Charge payment via API
return { paymentId: 'pay_456' };
},
async (ctx, result) => {
// Refund payment
}
);
const result = await saga.run($json);
return [{ json: result }];Error Handling Patterns
Retry with Exponential Backoff
// Function node for database retry logic
async function executeWithRetry(operation, maxRetries = 3, baseDelay = 1000) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
// Check if error is retryable
const retryable = [
'ECONNRESET',
'ETIMEDOUT',
'ECONNREFUSED',
'connection_failure',
'deadlock_detected'
].some(code =>
error.code === code ||
error.message.includes(code)
);
if (!retryable || attempt === maxRetries) {
throw error;
}
// Exponential backoff with jitter
const delay = baseDelay * Math.pow(2, attempt - 1);
const jitter = Math.random() * 1000;
console.log(`Retry ${attempt}/${maxRetries} after ${delay + jitter}ms`);
await new Promise(resolve => setTimeout(resolve, delay + jitter));
}
}
throw lastError;
}
// Usage
const result = await executeWithRetry(async () => {
// Your database operation here
return await performQuery($json.query);
});
return [{ json: result }];Connection Health Check
// Function node for database health monitoring
const checks = {
postgres: async () => {
const { Pool } = require('pg');
const pool = new Pool({ connectionString: $env.PG_URL });
try {
const result = await pool.query('SELECT 1');
return { healthy: true, latency: result.latency };
} catch (error) {
return { healthy: false, error: error.message };
} finally {
await pool.end();
}
},
mongodb: async () => {
const { MongoClient } = require('mongodb');
const client = new MongoClient($env.MONGODB_URI);
try {
await client.connect();
const start = Date.now();
await client.db().command({ ping: 1 });
return { healthy: true, latency: Date.now() - start };
} catch (error) {
return { healthy: false, error: error.message };
} finally {
await client.close();
}
},
redis: async () => {
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
try {
const start = Date.now();
await redis.ping();
return { healthy: true, latency: Date.now() - start };
} catch (error) {
return { healthy: false, error: error.message };
} finally {
redis.disconnect();
}
}
};
const results = {};
for (const [name, check] of Object.entries(checks)) {
results[name] = await check();
}
const allHealthy = Object.values(results).every(r => r.healthy);
return [{
json: {
timestamp: new Date().toISOString(),
status: allHealthy ? 'healthy' : 'degraded',
databases: results
}
}];Best Practices Summary
Security
- Always use parameterized queries - Never concatenate user input
- Store credentials in environment variables - Never hardcode
- Use connection pooling - Limit max connections
- Implement timeouts - Prevent hanging connections
Performance
- Use indexes strategically - Index frequently queried fields
- Batch operations - Reduce round trips
- Implement caching - Cache frequently accessed data
- Monitor query performance - Log slow queries
Reliability
- Implement retry logic - Handle transient failures
- Use transactions - Ensure data consistency
- Health checks - Monitor database connectivity
- Graceful degradation - Handle database outages
By following these patterns, you can build robust, scalable database integrations in your n8n workflows that handle real-world production requirements.