n8n Automation

Pattern-uri de integrare baze de date in n8n: PostgreSQL, MongoDB si nu numai

Petru Constantin
--12 min lectura
#n8n#database#PostgreSQL#MongoDB#automation

Pattern-uri de integrare baze de date in n8n: PostgreSQL, MongoDB si nu numai

Integrarile cu baze de date sunt coloana vertebrala a majoritatii workflow-urilor de automatizare. Acest ghid acopera pattern-uri gata de productie pentru integrarea n8n cu PostgreSQL, MongoDB, Redis si alte baze de date.

Pattern-uri de integrare PostgreSQL

Configurare conexiune de baza

// PostgreSQL node configuration
{
  "operation": "executeQuery",
  "query": "SELECT * FROM users WHERE created_at > $1",
  "parameters": "={{ [$json.since_date] }}",
  "options": {
    "queryTimeout": 30000,
    "connectionTimeout": 10000
  }
}

Query-uri parametrizate (prevenire SQL Injection)

Foloseste intotdeauna query-uri parametrizate pentru a preveni SQL injection:

// Sigur: Query parametrizat
{
  "operation": "executeQuery",
  "query": `
    INSERT INTO audit_log (user_id, action, resource, timestamp)
    VALUES ($1, $2, $3, $4)
    RETURNING id
  `,
  "parameters": "={{ [$json.userId, $json.action, $json.resource, new Date().toISOString()] }}"
}
 
// NESIGUR: Interpolare string - NU face NICIODATA asta
{
  "query": `SELECT * FROM users WHERE email = '${$json.email}'`  // Vulnerabilitate SQL Injection!
}

Pattern de insert in batch

Pentru inserarea eficienta a mai multor inregistrari:

// Function node to prepare batch insert
const records = $input.all();
const values = [];
const params = [];
 
records.forEach((item, index) => {
  const offset = index * 4;
  values.push(`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4})`);
  params.push(
    item.json.name,
    item.json.email,
    item.json.role,
    new Date().toISOString()
  );
});
 
return [{
  json: {
    query: `
      INSERT INTO users (name, email, role, created_at)
      VALUES ${values.join(', ')}
      ON CONFLICT (email) DO UPDATE SET
        name = EXCLUDED.name,
        role = EXCLUDED.role,
        updated_at = NOW()
      RETURNING *
    `,
    parameters: params
  }
}];

Pattern de tranzactii

Implementarea tranzactiilor in n8n:

// Start transaction
{
  "operation": "executeQuery",
  "query": "BEGIN"
}
 
// Execute operations
{
  "operation": "executeQuery",
  "query": "INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING id",
  "parameters": "={{ [$json.userId, $json.total] }}"
}
 
{
  "operation": "executeQuery",
  "query": "INSERT INTO order_items (order_id, product_id, quantity) VALUES ($1, $2, $3)",
  "parameters": "={{ [$json.orderId, $json.productId, $json.quantity] }}"
}
 
// Commit sau rollback pe baza succesului
// In error handler:
{
  "operation": "executeQuery",
  "query": "ROLLBACK"
}
 
// La succes:
{
  "operation": "executeQuery",
  "query": "COMMIT"
}

Connection pooling cu Function Node

// Function node for connection pool management
const { Pool } = require('pg');
 
// Connection pool configuration
const poolConfig = {
  host: $env.PG_HOST,
  port: $env.PG_PORT,
  database: $env.PG_DATABASE,
  user: $env.PG_USER,
  password: $env.PG_PASSWORD,
  max: 20,                    // Dimensiune maxima pool
  idleTimeoutMillis: 30000,   // Inchide conexiunile inactive dupa 30s
  connectionTimeoutMillis: 10000,
  maxUses: 7500               // Inchide conexiunea dupa 7500 query-uri
};
 
// Get pool from workflow static data or create new
let pool = $getWorkflowStaticData('global').pgPool;
 
if (!pool) {
  pool = new Pool(poolConfig);
 
  pool.on('error', (err) => {
    console.error('Unexpected error on idle client', err);
  });
 
  $getWorkflowStaticData('global').pgPool = pool;
}
 
// Execute query with automatic connection management
const client = await pool.connect();
 
try {
  const result = await client.query(
    'SELECT * FROM users WHERE status = $1 LIMIT $2',
    [$json.status, $json.limit || 100]
  );
 
  return result.rows.map(row => ({ json: row }));
} finally {
  client.release();
}

Pattern-uri de integrare MongoDB

Operatiuni CRUD de baza

// Insert document
{
  "operation": "insert",
  "collection": "users",
  "document": "={{ $json }}"
}
 
// Find with query
{
  "operation": "find",
  "collection": "users",
  "query": {
    "status": "active",
    "created_at": { "$gte": "={{ $json.since }}" }
  },
  "options": {
    "limit": 100,
    "sort": { "created_at": -1 }
  }
}
 
// Update with operators
{
  "operation": "update",
  "collection": "users",
  "query": { "_id": "={{ $json.userId }}" },
  "update": {
    "$set": {
      "last_login": "={{ new Date().toISOString() }}",
      "login_count": "={{ $json.loginCount + 1 }}"
    },
    "$push": {
      "login_history": {
        "timestamp": "={{ new Date().toISOString() }}",
        "ip": "={{ $json.ipAddress }}"
      }
    }
  }
}

Pipeline de agregare

// Function node for complex aggregation
const pipeline = [
  // Match stage
  {
    $match: {
      created_at: {
        $gte: new Date($json.startDate),
        $lte: new Date($json.endDate)
      },
      status: 'completed'
    }
  },
  // Group stage
  {
    $group: {
      _id: {
        year: { $year: '$created_at' },
        month: { $month: '$created_at' },
        day: { $dayOfMonth: '$created_at' }
      },
      total_orders: { $sum: 1 },
      total_revenue: { $sum: '$amount' },
      avg_order_value: { $avg: '$amount' },
      unique_customers: { $addToSet: '$customer_id' }
    }
  },
  // Project stage
  {
    $project: {
      _id: 0,
      date: {
        $dateFromParts: {
          year: '$_id.year',
          month: '$_id.month',
          day: '$_id.day'
        }
      },
      total_orders: 1,
      total_revenue: { $round: ['$total_revenue', 2] },
      avg_order_value: { $round: ['$avg_order_value', 2] },
      unique_customers: { $size: '$unique_customers' }
    }
  },
  // Sorteaza dupa data
  { $sort: { date: 1 } }
];
 
return [{ json: { pipeline } }];

Operatiuni bulk write

// Function node for bulk operations
const operations = $input.all().map(item => {
  const data = item.json;
 
  switch (data.operation) {
    case 'insert':
      return {
        insertOne: {
          document: data.document
        }
      };
    case 'update':
      return {
        updateOne: {
          filter: { _id: data.id },
          update: { $set: data.updates },
          upsert: data.upsert || false
        }
      };
    case 'delete':
      return {
        deleteOne: {
          filter: { _id: data.id }
        }
      };
    default:
      throw new Error(`Unknown operation: ${data.operation}`);
  }
});
 
return [{
  json: {
    operations,
    options: {
      ordered: false  // Continua la eroare
    }
  }
}];

Change Streams pentru actualizari in timp real

// Function node for change stream listener
const { MongoClient } = require('mongodb');
 
const client = new MongoClient($env.MONGODB_URI);
await client.connect();
 
const collection = client.db('mydb').collection('orders');
const changeStream = collection.watch([
  {
    $match: {
      'operationType': { $in: ['insert', 'update'] },
      'fullDocument.status': 'pending'
    }
  }
]);
 
// Proceseaza evenimentele de schimbare
const changes = [];
const timeout = setTimeout(() => {
  changeStream.close();
}, 30000);  // timeout 30 secunde
 
changeStream.on('change', (change) => {
  changes.push({
    operation: change.operationType,
    document: change.fullDocument,
    timestamp: change.clusterTime
  });
 
  if (changes.length >= 10) {  // Dimensiune batch
    clearTimeout(timeout);
    changeStream.close();
  }
});
 
await new Promise(resolve => {
  changeStream.on('close', resolve);
});
 
await client.close();
 
return changes.map(c => ({ json: c }));

Pattern-uri de integrare Redis

Pattern de caching

// Function node for cache-aside pattern
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
 
const cacheKey = `user:${$json.userId}:profile`;
const cacheTTL = 3600;  // 1 ora
 
// Incearca sa ia din cache
let data = await redis.get(cacheKey);
 
if (data) {
  // Cache hit
  return [{
    json: {
      source: 'cache',
      data: JSON.parse(data)
    }
  }];
}
 
// Cache miss - datele vor fi preluate de nodul urmator
// Returneaza flag care indica cache miss
return [{
  json: {
    source: 'database',
    cacheKey,
    cacheTTL,
    shouldCache: true
  }
}];
 
// Dupa preluarea din baza de date, pune in cache rezultatul:
// await redis.setex(cacheKey, cacheTTL, JSON.stringify(data));

Rate limiting cu Redis

// Function node for sliding window rate limit
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
 
const userId = $json.userId;
const windowMs = 60000;  // Fereastra de 1 minut
const maxRequests = 100;
 
const key = `ratelimit:${userId}`;
const now = Date.now();
const windowStart = now - windowMs;
 
// Foloseste tranzactie Redis pentru operatiuni atomice
const multi = redis.multi();
multi.zremrangebyscore(key, '-inf', windowStart);  // Sterge intrarile vechi
multi.zadd(key, now, `${now}-${Math.random()}`);   // Adauga cererea curenta
multi.zcard(key);                                   // Numara cererile din fereastra
multi.expire(key, Math.ceil(windowMs / 1000));     // Seteaza TTL
 
const results = await multi.exec();
const requestCount = results[2][1];
 
if (requestCount > maxRequests) {
  return [{
    json: {
      allowed: false,
      remaining: 0,
      retryAfter: Math.ceil((windowStart + windowMs - now) / 1000)
    }
  }];
}
 
return [{
  json: {
    allowed: true,
    remaining: maxRequests - requestCount,
    limit: maxRequests
  }
}];

Pattern de lock distribuit

// Function node for distributed locking
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
const crypto = require('crypto');
 
const lockKey = `lock:${$json.resourceId}`;
const lockValue = crypto.randomUUID();
const lockTTL = 30;  // 30 secunde
 
// Incearca sa obtina lock-ul
const acquired = await redis.set(
  lockKey,
  lockValue,
  'EX', lockTTL,
  'NX'
);
 
if (!acquired) {
  return [{
    json: {
      lockAcquired: false,
      error: 'Resursa este blocata de alt proces'
    }
  }];
}
 
// Returneaza informatii lock pentru eliberare ulterioara
return [{
  json: {
    lockAcquired: true,
    lockKey,
    lockValue,
    expiresIn: lockTTL
  }
}];
 
// Pentru a elibera lock-ul (in blocul finally):
// Foloseste script Lua pentru verificare-si-stergere atomica
const releaseLockScript = `
  if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
  else
    return 0
  end
`;
// await redis.eval(releaseLockScript, 1, lockKey, lockValue);

Pattern Pub/Sub

// Publisher function node
const Redis = require('ioredis');
const publisher = new Redis($env.REDIS_URL);
 
const channel = `events:${$json.eventType}`;
const message = JSON.stringify({
  id: $json.eventId,
  type: $json.eventType,
  data: $json.data,
  timestamp: new Date().toISOString()
});
 
await publisher.publish(channel, message);
await publisher.quit();
 
return [{
  json: {
    published: true,
    channel,
    messageSize: message.length
  }
}];
 
// Subscriber (workflow separat cu webhook trigger)
const Redis = require('ioredis');
const subscriber = new Redis($env.REDIS_URL);
 
const messages = [];
const channels = ['events:order', 'events:payment', 'events:shipment'];
 
subscriber.subscribe(...channels);
 
subscriber.on('message', (channel, message) => {
  messages.push({
    channel,
    message: JSON.parse(message),
    receivedAt: new Date().toISOString()
  });
});
 
// Colecteaza mesaje timp de 5 secunde
await new Promise(resolve => setTimeout(resolve, 5000));
await subscriber.quit();
 
return messages.map(m => ({ json: m }));

Pattern-uri multi-baze de date

Workflow de sincronizare date

// Sincronizeaza date din PostgreSQL in MongoDB
// Pasul 1: Preia din PostgreSQL
{
  "operation": "executeQuery",
  "query": `
    SELECT * FROM products
    WHERE updated_at > $1
    ORDER BY updated_at
    LIMIT 1000
  `,
  "parameters": "={{ [$json.lastSyncTimestamp] }}"
}
 
// Pasul 2: Transforma datele (Function node)
const products = $input.all();
 
return products.map(item => ({
  json: {
    filter: { postgres_id: item.json.id },
    update: {
      $set: {
        postgres_id: item.json.id,
        name: item.json.name,
        description: item.json.description,
        price: parseFloat(item.json.price),
        category: item.json.category,
        metadata: {
          source: 'postgresql',
          synced_at: new Date().toISOString(),
          source_updated_at: item.json.updated_at
        }
      }
    },
    upsert: true
  }
}));
 
// Pasul 3: Bulk upsert in MongoDB
{
  "operation": "bulkWrite",
  "collection": "products",
  "operations": "={{ $json.operations }}"
}

Pattern de tranzactie cross-database

// Pattern Saga pentru consistenta intre baze de date
// Function node care implementeaza tranzactii compensatoare
 
class SagaOrchestrator {
  constructor() {
    this.steps = [];
    this.completedSteps = [];
  }
 
  addStep(name, execute, compensate) {
    this.steps.push({ name, execute, compensate });
  }
 
  async run(context) {
    try {
      for (const step of this.steps) {
        const result = await step.execute(context);
        this.completedSteps.push({
          name: step.name,
          compensate: step.compensate,
          result
        });
        context = { ...context, [step.name]: result };
      }
      return { success: true, context };
    } catch (error) {
      // Compenseaza in ordine inversa
      for (const completed of this.completedSteps.reverse()) {
        try {
          await completed.compensate(context, completed.result);
        } catch (compensateError) {
          console.error(`Compensarea a esuat pentru ${completed.name}:`, compensateError);
        }
      }
      return { success: false, error: error.message };
    }
  }
}
 
// Utilizare
const saga = new SagaOrchestrator();
 
saga.addStep(
  'createOrder',
  async (ctx) => {
    // Insereaza in PostgreSQL
    return { orderId: 'order_123' };
  },
  async (ctx, result) => {
    // Sterge comanda din PostgreSQL
  }
);
 
saga.addStep(
  'reserveInventory',
  async (ctx) => {
    // Actualizeaza inventarul in MongoDB
    return { reserved: true };
  },
  async (ctx, result) => {
    // Elibereaza inventarul in MongoDB
  }
);
 
saga.addStep(
  'processPayment',
  async (ctx) => {
    // Proceseaza plata prin API
    return { paymentId: 'pay_456' };
  },
  async (ctx, result) => {
    // Returneaza plata
  }
);
 
const result = await saga.run($json);
return [{ json: result }];

Pattern-uri de gestionare erori

Retry cu backoff exponential

// Function node for database retry logic
async function executeWithRetry(operation, maxRetries = 3, baseDelay = 1000) {
  let lastError;
 
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await operation();
    } catch (error) {
      lastError = error;
 
      // Verifica daca eroarea permite reincercare
      const retryable = [
        'ECONNRESET',
        'ETIMEDOUT',
        'ECONNREFUSED',
        'connection_failure',
        'deadlock_detected'
      ].some(code =>
        error.code === code ||
        error.message.includes(code)
      );
 
      if (!retryable || attempt === maxRetries) {
        throw error;
      }
 
      // Backoff exponential cu jitter
      const delay = baseDelay * Math.pow(2, attempt - 1);
      const jitter = Math.random() * 1000;
 
      console.log(`Reincercare ${attempt}/${maxRetries} dupa ${delay + jitter}ms`);
      await new Promise(resolve => setTimeout(resolve, delay + jitter));
    }
  }
 
  throw lastError;
}
 
// Utilizare
const result = await executeWithRetry(async () => {
  // Operatiunea ta de baza de date aici
  return await performQuery($json.query);
});
 
return [{ json: result }];

Verificare sanatate conexiune

// Function node for database health monitoring
const checks = {
  postgres: async () => {
    const { Pool } = require('pg');
    const pool = new Pool({ connectionString: $env.PG_URL });
 
    try {
      const result = await pool.query('SELECT 1');
      return { healthy: true, latency: result.latency };
    } catch (error) {
      return { healthy: false, error: error.message };
    } finally {
      await pool.end();
    }
  },
 
  mongodb: async () => {
    const { MongoClient } = require('mongodb');
    const client = new MongoClient($env.MONGODB_URI);
 
    try {
      await client.connect();
      const start = Date.now();
      await client.db().command({ ping: 1 });
      return { healthy: true, latency: Date.now() - start };
    } catch (error) {
      return { healthy: false, error: error.message };
    } finally {
      await client.close();
    }
  },
 
  redis: async () => {
    const Redis = require('ioredis');
    const redis = new Redis($env.REDIS_URL);
 
    try {
      const start = Date.now();
      await redis.ping();
      return { healthy: true, latency: Date.now() - start };
    } catch (error) {
      return { healthy: false, error: error.message };
    } finally {
      redis.disconnect();
    }
  }
};
 
const results = {};
for (const [name, check] of Object.entries(checks)) {
  results[name] = await check();
}
 
const allHealthy = Object.values(results).every(r => r.healthy);
 
return [{
  json: {
    timestamp: new Date().toISOString(),
    status: allHealthy ? 'healthy' : 'degraded',
    databases: results
  }
}];

Sumar bune practici

Securitate

  1. Foloseste intotdeauna query-uri parametrizate - Nu concatena niciodata input-ul utilizatorului
  2. Stocheaza credentialele in variabile de mediu - Nu le hardcoda niciodata
  3. Foloseste connection pooling - Limiteaza numarul maxim de conexiuni
  4. Implementeaza timeout-uri - Previne conexiunile blocate

Performanta

  1. Foloseste indexuri strategic - Indexeaza campurile interogate frecvent
  2. Operatiuni in batch - Reduce round trip-urile
  3. Implementeaza caching - Pune in cache datele accesate frecvent
  4. Monitorizeaza performanta query-urilor - Logheaza query-urile lente

Fiabilitate

  1. Implementeaza logica de retry - Gestioneaza erorile tranzitorii
  2. Foloseste tranzactii - Asigura consistenta datelor
  3. Verificari de sanatate - Monitorizeaza conectivitatea bazei de date
  4. Degradare gratiosa - Gestioneaza indisponibilitatea bazei de date

Urmand aceste pattern-uri, poti construi integrari robuste si scalabile cu baze de date in workflow-urile n8n care gestioneaza cerintele reale de productie.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.