n8n Automation

n8n Database Integration Patterns: PostgreSQL, MongoDB, and Beyond

DeviDevs Team
12 min read
#n8n#database#PostgreSQL#MongoDB#automation

n8n Database Integration Patterns: PostgreSQL, MongoDB, and Beyond

Database integrations are the backbone of most automation workflows. This guide covers production-ready patterns for integrating n8n with PostgreSQL, MongoDB, Redis, and other databases.

PostgreSQL Integration Patterns

Basic Connection Setup

// PostgreSQL node configuration
{
  "operation": "executeQuery",
  "query": "SELECT * FROM users WHERE created_at > $1",
  "parameters": "={{ [$json.since_date] }}",
  "options": {
    "queryTimeout": 30000,
    "connectionTimeout": 10000
  }
}

Parameterized Queries (Preventing SQL Injection)

Always use parameterized queries to prevent SQL injection:

// Safe: Parameterized query
{
  "operation": "executeQuery",
  "query": `
    INSERT INTO audit_log (user_id, action, resource, timestamp)
    VALUES ($1, $2, $3, $4)
    RETURNING id
  `,
  "parameters": "={{ [$json.userId, $json.action, $json.resource, new Date().toISOString()] }}"
}
 
// UNSAFE: String interpolation - NEVER DO THIS
{
  "query": `SELECT * FROM users WHERE email = '${$json.email}'`  // SQL Injection vulnerability!
}

Batch Insert Pattern

For inserting multiple records efficiently:

// Function node to prepare batch insert
const records = $input.all();
const values = [];
const params = [];
 
records.forEach((item, index) => {
  const offset = index * 4;
  values.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4})`);
  params.push(
    item.json.name,
    item.json.email,
    item.json.role,
    new Date().toISOString()
  );
});
 
return [{
  json: {
    query: `
      INSERT INTO users (name, email, role, created_at)
      VALUES ${values.join(', ')}
      ON CONFLICT (email) DO UPDATE SET
        name = EXCLUDED.name,
        role = EXCLUDED.role,
        updated_at = NOW()
      RETURNING *
    `,
    parameters: params
  }
}];

Transaction Pattern

Implementing transactions in n8n:

// Start transaction
{
  "operation": "executeQuery",
  "query": "BEGIN"
}
 
// Execute operations
{
  "operation": "executeQuery",
  "query": "INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING id",
  "parameters": "={{ [$json.userId, $json.total] }}"
}
 
{
  "operation": "executeQuery",
  "query": "INSERT INTO order_items (order_id, product_id, quantity) VALUES ($1, $2, $3)",
  "parameters": "={{ [$json.orderId, $json.productId, $json.quantity] }}"
}
 
// Commit or rollback based on success
// In error handler:
{
  "operation": "executeQuery",
  "query": "ROLLBACK"
}
 
// On success:
{
  "operation": "executeQuery",
  "query": "COMMIT"
}

Connection Pooling with Function Node

// Function node for connection pool management
const { Pool } = require('pg');
 
// Connection pool configuration
const poolConfig = {
  host: $env.PG_HOST,
  port: $env.PG_PORT,
  database: $env.PG_DATABASE,
  user: $env.PG_USER,
  password: $env.PG_PASSWORD,
  max: 20,                    // Maximum pool size
  idleTimeoutMillis: 30000,   // Close idle connections after 30s
  connectionTimeoutMillis: 10000,
  maxUses: 7500               // Close connection after 7500 queries
};
 
// Get pool from workflow static data or create new
let pool = $getWorkflowStaticData('global').pgPool;
 
if (!pool) {
  pool = new Pool(poolConfig);
 
  pool.on('error', (err) => {
    console.error('Unexpected error on idle client', err);
  });
 
  $getWorkflowStaticData('global').pgPool = pool;
}
 
// Execute query with automatic connection management
const client = await pool.connect();
 
try {
  const result = await client.query(
    'SELECT * FROM users WHERE status = $1 LIMIT $2',
    [$json.status, $json.limit || 100]
  );
 
  return result.rows.map(row => ({ json: row }));
} finally {
  client.release();
}

MongoDB Integration Patterns

Basic CRUD Operations

// Insert document
{
  "operation": "insert",
  "collection": "users",
  "document": "={{ $json }}"
}
 
// Find with query
{
  "operation": "find",
  "collection": "users",
  "query": {
    "status": "active",
    "created_at": { "$gte": "={{ $json.since }}" }
  },
  "options": {
    "limit": 100,
    "sort": { "created_at": -1 }
  }
}
 
// Update with operators
{
  "operation": "update",
  "collection": "users",
  "query": { "_id": "={{ $json.userId }}" },
  "update": {
    "$set": {
      "last_login": "={{ new Date().toISOString() }}",
      "login_count": "={{ $json.loginCount + 1 }}"
    },
    "$push": {
      "login_history": {
        "timestamp": "={{ new Date().toISOString() }}",
        "ip": "={{ $json.ipAddress }}"
      }
    }
  }
}

Aggregation Pipeline

// Function node for complex aggregation
const pipeline = [
  // Match stage
  {
    $match: {
      created_at: {
        $gte: new Date($json.startDate),
        $lte: new Date($json.endDate)
      },
      status: 'completed'
    }
  },
  // Group stage
  {
    $group: {
      _id: {
        year: { $year: '$created_at' },
        month: { $month: '$created_at' },
        day: { $dayOfMonth: '$created_at' }
      },
      total_orders: { $sum: 1 },
      total_revenue: { $sum: '$amount' },
      avg_order_value: { $avg: '$amount' },
      unique_customers: { $addToSet: '$customer_id' }
    }
  },
  // Project stage
  {
    $project: {
      _id: 0,
      date: {
        $dateFromParts: {
          year: '$_id.year',
          month: '$_id.month',
          day: '$_id.day'
        }
      },
      total_orders: 1,
      total_revenue: { $round: ['$total_revenue', 2] },
      avg_order_value: { $round: ['$avg_order_value', 2] },
      unique_customers: { $size: '$unique_customers' }
    }
  },
  // Sort by date
  { $sort: { date: 1 } }
];
 
return [{ json: { pipeline } }];

Bulk Write Operations

// Function node for bulk operations
const operations = $input.all().map(item => {
  const data = item.json;
 
  switch (data.operation) {
    case 'insert':
      return {
        insertOne: {
          document: data.document
        }
      };
    case 'update':
      return {
        updateOne: {
          filter: { _id: data.id },
          update: { $set: data.updates },
          upsert: data.upsert || false
        }
      };
    case 'delete':
      return {
        deleteOne: {
          filter: { _id: data.id }
        }
      };
    default:
      throw new Error(`Unknown operation: ${data.operation}`);
  }
});
 
return [{
  json: {
    operations,
    options: {
      ordered: false  // Continue on error
    }
  }
}];

Change Streams for Real-time Updates

// Function node for change stream listener
const { MongoClient } = require('mongodb');
 
const client = new MongoClient($env.MONGODB_URI);
await client.connect();
 
const collection = client.db('mydb').collection('orders');
const changeStream = collection.watch([
  {
    $match: {
      'operationType': { $in: ['insert', 'update'] },
      'fullDocument.status': 'pending'
    }
  }
]);
 
// Process change events
const changes = [];
const timeout = setTimeout(() => {
  changeStream.close();
}, 30000);  // 30 second timeout
 
changeStream.on('change', (change) => {
  changes.push({
    operation: change.operationType,
    document: change.fullDocument,
    timestamp: change.clusterTime
  });
 
  if (changes.length >= 10) {  // Batch size
    clearTimeout(timeout);
    changeStream.close();
  }
});
 
await new Promise(resolve => {
  changeStream.on('close', resolve);
});
 
await client.close();
 
return changes.map(c => ({ json: c }));

Redis Integration Patterns

Caching Pattern

// Function node for cache-aside pattern
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
 
const cacheKey = `user:${$json.userId}:profile`;
const cacheTTL = 3600;  // 1 hour
 
// Try to get from cache
let data = await redis.get(cacheKey);
 
if (data) {
  // Cache hit
  return [{
    json: {
      source: 'cache',
      data: JSON.parse(data)
    }
  }];
}
 
// Cache miss - data will be fetched by next node
// Return flag to indicate cache miss
return [{
  json: {
    source: 'database',
    cacheKey,
    cacheTTL,
    shouldCache: true
  }
}];
 
// After database fetch, cache the result:
// await redis.setex(cacheKey, cacheTTL, JSON.stringify(data));

Rate Limiting with Redis

// Function node for sliding window rate limit
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
 
const userId = $json.userId;
const windowMs = 60000;  // 1 minute window
const maxRequests = 100;
 
const key = `ratelimit:${userId}`;
const now = Date.now();
const windowStart = now - windowMs;
 
// Use Redis transaction for atomic operations
const multi = redis.multi();
multi.zremrangebyscore(key, '-inf', windowStart);  // Remove old entries
multi.zadd(key, now, `${now}-${Math.random()}`);   // Add current request
multi.zcard(key);                                   // Count requests in window
multi.expire(key, Math.ceil(windowMs / 1000));     // Set TTL
 
const results = await multi.exec();
const requestCount = results[2][1];
 
if (requestCount > maxRequests) {
  return [{
    json: {
      allowed: false,
      remaining: 0,
      retryAfter: Math.ceil((windowStart + windowMs - now) / 1000)
    }
  }];
}
 
return [{
  json: {
    allowed: true,
    remaining: maxRequests - requestCount,
    limit: maxRequests
  }
}];

Distributed Lock Pattern

// Function node for distributed locking
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
const crypto = require('crypto');
 
const lockKey = `lock:${$json.resourceId}`;
const lockValue = crypto.randomUUID();
const lockTTL = 30;  // 30 seconds
 
// Try to acquire lock
const acquired = await redis.set(
  lockKey,
  lockValue,
  'EX', lockTTL,
  'NX'
);
 
if (!acquired) {
  return [{
    json: {
      lockAcquired: false,
      error: 'Resource is locked by another process'
    }
  }];
}
 
// Return lock info for later release
return [{
  json: {
    lockAcquired: true,
    lockKey,
    lockValue,
    expiresIn: lockTTL
  }
}];
 
// To release lock (in finally block):
// Use Lua script for atomic check-and-delete
const releaseLockScript = `
  if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
  else
    return 0
  end
`;
// await redis.eval(releaseLockScript, 1, lockKey, lockValue);

Pub/Sub Pattern

// Publisher function node
const Redis = require('ioredis');
const publisher = new Redis($env.REDIS_URL);
 
const channel = `events:${$json.eventType}`;
const message = JSON.stringify({
  id: $json.eventId,
  type: $json.eventType,
  data: $json.data,
  timestamp: new Date().toISOString()
});
 
await publisher.publish(channel, message);
await publisher.quit();
 
return [{
  json: {
    published: true,
    channel,
    messageSize: message.length
  }
}];
 
// Subscriber (separate workflow with webhook trigger)
const Redis = require('ioredis');
const subscriber = new Redis($env.REDIS_URL);
 
const messages = [];
const channels = ['events:order', 'events:payment', 'events:shipment'];
 
subscriber.subscribe(...channels);
 
subscriber.on('message', (channel, message) => {
  messages.push({
    channel,
    message: JSON.parse(message),
    receivedAt: new Date().toISOString()
  });
});
 
// Collect messages for 5 seconds
await new Promise(resolve => setTimeout(resolve, 5000));
await subscriber.quit();
 
return messages.map(m => ({ json: m }));

Multi-Database Patterns

Data Synchronization Workflow

// Sync data from PostgreSQL to MongoDB
// Step 1: Fetch from PostgreSQL
{
  "operation": "executeQuery",
  "query": `
    SELECT * FROM products
    WHERE updated_at > $1
    ORDER BY updated_at
    LIMIT 1000
  `,
  "parameters": "={{ [$json.lastSyncTimestamp] }}"
}
 
// Step 2: Transform data (Function node)
const products = $input.all();
 
return products.map(item => ({
  json: {
    filter: { postgres_id: item.json.id },
    update: {
      $set: {
        postgres_id: item.json.id,
        name: item.json.name,
        description: item.json.description,
        price: parseFloat(item.json.price),
        category: item.json.category,
        metadata: {
          source: 'postgresql',
          synced_at: new Date().toISOString(),
          source_updated_at: item.json.updated_at
        }
      }
    },
    upsert: true
  }
}));
 
// Step 3: Bulk upsert to MongoDB
{
  "operation": "bulkWrite",
  "collection": "products",
  "operations": "={{ $json.operations }}"
}

Cross-Database Transaction Pattern

// Saga pattern for cross-database consistency
// Function node implementing compensating transactions
 
class SagaOrchestrator {
  constructor() {
    this.steps = [];
    this.completedSteps = [];
  }
 
  addStep(name, execute, compensate) {
    this.steps.push({ name, execute, compensate });
  }
 
  async run(context) {
    try {
      for (const step of this.steps) {
        const result = await step.execute(context);
        this.completedSteps.push({
          name: step.name,
          compensate: step.compensate,
          result
        });
        context = { ...context, [step.name]: result };
      }
      return { success: true, context };
    } catch (error) {
      // Compensate in reverse order
      for (const completed of this.completedSteps.reverse()) {
        try {
          await completed.compensate(context, completed.result);
        } catch (compensateError) {
          console.error(`Compensation failed for ${completed.name}:`, compensateError);
        }
      }
      return { success: false, error: error.message };
    }
  }
}
 
// Usage
const saga = new SagaOrchestrator();
 
saga.addStep(
  'createOrder',
  async (ctx) => {
    // Insert into PostgreSQL
    return { orderId: 'order_123' };
  },
  async (ctx, result) => {
    // Delete order from PostgreSQL
  }
);
 
saga.addStep(
  'reserveInventory',
  async (ctx) => {
    // Update inventory in MongoDB
    return { reserved: true };
  },
  async (ctx, result) => {
    // Release inventory in MongoDB
  }
);
 
saga.addStep(
  'processPayment',
  async (ctx) => {
    // Charge payment via API
    return { paymentId: 'pay_456' };
  },
  async (ctx, result) => {
    // Refund payment
  }
);
 
const result = await saga.run($json);
return [{ json: result }];

Error Handling Patterns

Retry with Exponential Backoff

// Function node for database retry logic
async function executeWithRetry(operation, maxRetries = 3, baseDelay = 1000) {
  let lastError;
 
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await operation();
    } catch (error) {
      lastError = error;
 
      // Check if error is retryable
      const retryable = [
        'ECONNRESET',
        'ETIMEDOUT',
        'ECONNREFUSED',
        'connection_failure',
        'deadlock_detected'
      ].some(code =>
        error.code === code ||
        error.message.includes(code)
      );
 
      if (!retryable || attempt === maxRetries) {
        throw error;
      }
 
      // Exponential backoff with jitter
      const delay = baseDelay * Math.pow(2, attempt - 1);
      const jitter = Math.random() * 1000;
 
      console.log(`Retry ${attempt}/${maxRetries} after ${delay + jitter}ms`);
      await new Promise(resolve => setTimeout(resolve, delay + jitter));
    }
  }
 
  throw lastError;
}
 
// Usage
const result = await executeWithRetry(async () => {
  // Your database operation here
  return await performQuery($json.query);
});
 
return [{ json: result }];

Connection Health Check

// Function node for database health monitoring
const checks = {
  postgres: async () => {
    const { Pool } = require('pg');
    const pool = new Pool({ connectionString: $env.PG_URL });
 
    try {
      const result = await pool.query('SELECT 1');
      return { healthy: true, latency: result.latency };
    } catch (error) {
      return { healthy: false, error: error.message };
    } finally {
      await pool.end();
    }
  },
 
  mongodb: async () => {
    const { MongoClient } = require('mongodb');
    const client = new MongoClient($env.MONGODB_URI);
 
    try {
      await client.connect();
      const start = Date.now();
      await client.db().command({ ping: 1 });
      return { healthy: true, latency: Date.now() - start };
    } catch (error) {
      return { healthy: false, error: error.message };
    } finally {
      await client.close();
    }
  },
 
  redis: async () => {
    const Redis = require('ioredis');
    const redis = new Redis($env.REDIS_URL);
 
    try {
      const start = Date.now();
      await redis.ping();
      return { healthy: true, latency: Date.now() - start };
    } catch (error) {
      return { healthy: false, error: error.message };
    } finally {
      redis.disconnect();
    }
  }
};
 
const results = {};
for (const [name, check] of Object.entries(checks)) {
  results[name] = await check();
}
 
const allHealthy = Object.values(results).every(r => r.healthy);
 
return [{
  json: {
    timestamp: new Date().toISOString(),
    status: allHealthy ? 'healthy' : 'degraded',
    databases: results
  }
}];

Best Practices Summary

Security

  1. Always use parameterized queries - Never concatenate user input
  2. Store credentials in environment variables - Never hardcode
  3. Use connection pooling - Limit max connections
  4. Implement timeouts - Prevent hanging connections

Performance

  1. Use indexes strategically - Index frequently queried fields
  2. Batch operations - Reduce round trips
  3. Implement caching - Cache frequently accessed data
  4. Monitor query performance - Log slow queries

Reliability

  1. Implement retry logic - Handle transient failures
  2. Use transactions - Ensure data consistency
  3. Health checks - Monitor database connectivity
  4. Graceful degradation - Handle database outages

By following these patterns, you can build robust, scalable database integrations in your n8n workflows that handle real-world production requirements.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.