n8n Automation

Workflow-uri de sincronizare a datelor cu n8n

Petru Constantin
--10 min lectura
#n8n#data-sync#automation#integration#etl

Sincronizarea datelor intre sisteme este esentiala pentru consistenta operationala. Acest ghid acopera construirea unor workflow-uri robuste de sincronizare cu n8n, care gestioneaza conflicte, actualizari incrementale si sincronizare multi-directionala.

Sincronizare bidirectionala CRM

Sincronizeaza datele intre CRM si alte sisteme:

// Bidirectional CRM <-> Database Sync Workflow
 
// 1. Webhook Trigger for Real-time Sync
const webhookTrigger = {
  name: "CRM Webhook",
  type: "n8n-nodes-base.webhook",
  parameters: {
    path: "crm-sync",
    httpMethod: "POST",
    responseMode: "onReceived"
  }
};
 
// 2. Determine Sync Direction and Type
function determineSyncAction(items) {
  const payload = items[0].json;
 
  const syncAction = {
    source: payload.source || 'crm',
    event_type: payload.event_type,
    entity_type: payload.entity_type,
    entity_id: payload.entity_id,
    timestamp: payload.timestamp || new Date().toISOString(),
    data: payload.data,
    action: 'unknown'
  };
 
  // Map event types to actions
  const actionMap = {
    'contact.created': 'create',
    'contact.updated': 'update',
    'contact.deleted': 'delete',
    'deal.created': 'create',
    'deal.updated': 'update',
    'deal.stage_changed': 'update',
    'company.created': 'create',
    'company.updated': 'update'
  };
 
  syncAction.action = actionMap[payload.event_type] || 'unknown';
 
  return [{ json: syncAction }];
}
 
// 3. Conflict Detection Node
const conflictDetection = {
  name: "Check for Conflicts",
  type: "n8n-nodes-base.function",
  parameters: {
    functionCode: `
      const syncAction = items[0].json;
      const entityId = syncAction.entity_id;
      const sourceTimestamp = new Date(syncAction.timestamp);
 
      // Get last sync state from database
      const lastSync = await $getWorkflowStaticData('global');
      const syncState = lastSync.syncState || {};
      const entityState = syncState[entityId] || {};
 
      // Check for conflict
      let hasConflict = false;
      let conflictType = null;
 
      if (entityState.lastModified) {
        const lastModified = new Date(entityState.lastModified);
        const timeDiff = Math.abs(sourceTimestamp - lastModified);
 
        // Conflict if modified within 5 minutes from different source
        if (timeDiff < 5 * 60 * 1000 && entityState.lastSource !== syncAction.source) {
          hasConflict = true;
          conflictType = 'concurrent_modification';
        }
      }
 
      // Check for version mismatch
      if (syncAction.data.version && entityState.version) {
        if (syncAction.data.version < entityState.version) {
          hasConflict = true;
          conflictType = 'version_mismatch';
        }
      }
 
      return [{
        json: {
          ...syncAction,
          hasConflict,
          conflictType,
          previousState: entityState
        }
      }];
    `
  }
};
 
// 4. Conflict Resolution Strategy
function resolveConflict(items) {
  const item = items[0].json;
 
  if (!item.hasConflict) {
    return [{ json: { ...item, resolution: 'no_conflict', proceed: true } }];
  }
 
  // Resolution strategies
  const strategies = {
    // Last write wins
    'last_write_wins': () => {
      const sourceTime = new Date(item.timestamp);
      const prevTime = new Date(item.previousState.lastModified);
      return sourceTime > prevTime;
    },
 
    // Source priority (CRM wins for contacts)
    'source_priority': () => {
      const priorities = { crm: 1, erp: 2, database: 3 };
      return priorities[item.source] <= priorities[item.previousState.lastSource];
    },
 
    // Merge non-conflicting fields
    'field_merge': () => {
      const mergedData = { ...item.previousState.data };
      for (const [key, value] of Object.entries(item.data)) {
        // Only update fields that changed from original
        if (item.previousState.data[key] !== value) {
          mergedData[key] = value;
        }
      }
      item.data = mergedData;
      return true;
    }
  };
 
  // Default strategy based on entity type
  let strategy = 'last_write_wins';
  if (item.entity_type === 'contact') {
    strategy = 'source_priority';
  } else if (item.entity_type === 'deal') {
    strategy = 'field_merge';
  }
 
  const proceed = strategies[strategy]();
 
  return [{
    json: {
      ...item,
      resolution: proceed ? `resolved_${strategy}` : 'blocked',
      proceed: proceed
    }
  }];
}
 
// 5. Sync to Target Systems
const syncToDatabase = {
  name: "Sync to Database",
  type: "n8n-nodes-base.postgres",
  parameters: {
    operation: "upsert",
    table: "={{$json.entity_type}}s",
    columns: "external_id, data, source, updated_at, sync_version",
    updateColumns: "data, source, updated_at, sync_version"
  }
};
 
const syncBackToCRM = {
  name: "Sync Back to CRM",
  type: "n8n-nodes-base.hubspot",
  parameters: {
    resource: "contact",
    operation: "update",
    contactId: "={{$json.entity_id}}",
    additionalFields: {
      properties: "={{$json.data}}"
    }
  }
};
 
// 6. Update Sync State
function updateSyncState(items) {
  const item = items[0].json;
 
  // Update workflow static data
  const staticData = $getWorkflowStaticData('global');
  if (!staticData.syncState) {
    staticData.syncState = {};
  }
 
  staticData.syncState[item.entity_id] = {
    lastModified: item.timestamp,
    lastSource: item.source,
    version: (item.data.version || 0) + 1,
    data: item.data
  };
 
  return [{
    json: {
      ...item,
      syncCompleted: true,
      newVersion: staticData.syncState[item.entity_id].version
    }
  }];
}

Sincronizare incrementala a datelor

Gestioneaza seturi mari de date cu sincronizare incrementala:

// Incremental Sync Workflow for Large Datasets
 
// 1. Scheduled Incremental Sync
const scheduledSync = {
  name: "Scheduled Sync",
  type: "n8n-nodes-base.scheduleTrigger",
  parameters: {
    rule: {
      interval: [{ field: "minutes", minutesInterval: 15 }]
    }
  }
};
 
// 2. Get Last Sync Checkpoint
const getCheckpoint = {
  name: "Get Checkpoint",
  type: "n8n-nodes-base.postgres",
  parameters: {
    operation: "executeQuery",
    query: `
      SELECT
        last_sync_time,
        last_sync_id,
        sync_cursor,
        records_synced
      FROM sync_checkpoints
      WHERE sync_name = 'crm_to_warehouse'
      ORDER BY last_sync_time DESC
      LIMIT 1
    `
  }
};
 
// 3. Fetch Changed Records
const fetchChangedRecords = {
  name: "Fetch Changes from CRM",
  type: "n8n-nodes-base.httpRequest",
  parameters: {
    method: "GET",
    url: "={{$env.CRM_API_URL}}/contacts",
    authentication: "oAuth2",
    queryParameters: {
      modified_after: "={{$json.last_sync_time || '1970-01-01T00:00:00Z'}}",
      cursor: "={{$json.sync_cursor || ''}}",
      limit: "100",
      order_by: "modified_at"
    }
  }
};
 
// 4. Transform and Validate Data
function transformForWarehouse(items) {
  const response = items[0].json;
  const records = response.data || [];
 
  const transformed = records.map(record => {
    // Data transformation
    const transformed = {
      id: record.id,
      external_id: record.external_id,
      email: record.email?.toLowerCase().trim(),
      first_name: record.first_name?.trim(),
      last_name: record.last_name?.trim(),
      full_name: `${record.first_name || ''} ${record.last_name || ''}`.trim(),
      company: record.company_name,
      phone: normalizePhone(record.phone),
      created_at: record.created_at,
      updated_at: record.updated_at,
      source_system: 'crm',
      sync_timestamp: new Date().toISOString()
    };
 
    // Validation
    const validationErrors = [];
    if (!transformed.email || !isValidEmail(transformed.email)) {
      validationErrors.push('Invalid email');
    }
    if (!transformed.id) {
      validationErrors.push('Missing ID');
    }
 
    return {
      json: {
        ...transformed,
        _valid: validationErrors.length === 0,
        _errors: validationErrors,
        _original: record
      }
    };
  });
 
  // Add pagination info
  transformed.push({
    json: {
      _metadata: true,
      hasMore: response.has_more,
      nextCursor: response.next_cursor,
      totalInBatch: records.length
    }
  });
 
  return transformed;
}
 
function normalizePhone(phone) {
  if (!phone) return null;
  return phone.replace(/[^\d+]/g, '');
}
 
function isValidEmail(email) {
  return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
}
 
// 5. Split Valid and Invalid Records
const splitRecords = {
  name: "Split Valid/Invalid",
  type: "n8n-nodes-base.if",
  parameters: {
    conditions: {
      boolean: [
        {
          value1: "={{$json._valid}}",
          value2: true
        }
      ]
    }
  }
};
 
// 6. Batch Insert to Warehouse
const batchInsert = {
  name: "Insert to Warehouse",
  type: "n8n-nodes-base.postgres",
  parameters: {
    operation: "executeQuery",
    query: `
      INSERT INTO contacts_staging (
        id, external_id, email, first_name, last_name,
        full_name, company, phone, created_at, updated_at,
        source_system, sync_timestamp
      )
      VALUES
      {{$json.map(r => \`(
        '\${r.id}', '\${r.external_id}', '\${r.email}',
        '\${r.first_name}', '\${r.last_name}', '\${r.full_name}',
        '\${r.company}', '\${r.phone}', '\${r.created_at}',
        '\${r.updated_at}', '\${r.source_system}', '\${r.sync_timestamp}'
      )\`).join(',')}}
      ON CONFLICT (id) DO UPDATE SET
        email = EXCLUDED.email,
        first_name = EXCLUDED.first_name,
        last_name = EXCLUDED.last_name,
        full_name = EXCLUDED.full_name,
        company = EXCLUDED.company,
        phone = EXCLUDED.phone,
        updated_at = EXCLUDED.updated_at,
        sync_timestamp = EXCLUDED.sync_timestamp
    `
  }
};
 
// 7. Log Invalid Records for Review
const logInvalidRecords = {
  name: "Log Invalid Records",
  type: "n8n-nodes-base.postgres",
  parameters: {
    operation: "insert",
    table: "sync_errors",
    columns: "sync_name, record_id, error_type, error_details, raw_data, created_at"
  }
};
 
// 8. Update Checkpoint
function updateCheckpoint(items) {
  const validRecords = items.filter(i => !i.json._metadata && i.json._valid);
  const metadata = items.find(i => i.json._metadata)?.json || {};
 
  const checkpoint = {
    sync_name: 'crm_to_warehouse',
    last_sync_time: new Date().toISOString(),
    last_sync_id: validRecords[validRecords.length - 1]?.json.id,
    sync_cursor: metadata.nextCursor || null,
    records_synced: validRecords.length,
    has_more: metadata.hasMore || false
  };
 
  return [{ json: checkpoint }];
}
 
// 9. Continue if More Records
const checkMoreRecords = {
  name: "Check for More",
  type: "n8n-nodes-base.if",
  parameters: {
    conditions: {
      boolean: [
        {
          value1: "={{$json.has_more}}",
          value2: true
        }
      ]
    }
  }
};
 
// Loop back to fetch more if needed
const loopBack = {
  name: "Loop to Fetch More",
  type: "n8n-nodes-base.executeWorkflow",
  parameters: {
    workflowId: "={{$workflow.id}}"
  }
};

Consistenta datelor in mai multe sisteme

Mentine consistenta datelor in mai multe sisteme simultan:

// Multi-System Consistency Workflow
 
// 1. Transaction Manager
class DistributedSyncTransaction {
  constructor(transactionId) {
    this.transactionId = transactionId;
    this.participants = [];
    this.status = 'pending';
    this.rollbackActions = [];
  }
 
  addParticipant(system, action, rollback) {
    this.participants.push({ system, action, rollback, status: 'pending' });
  }
 
  async execute() {
    // Two-phase commit pattern
    try {
      // Phase 1: Prepare all participants
      for (const participant of this.participants) {
        participant.status = 'preparing';
        await participant.action.prepare();
        participant.status = 'prepared';
      }
 
      // Phase 2: Commit all participants
      for (const participant of this.participants) {
        participant.status = 'committing';
        await participant.action.commit();
        this.rollbackActions.push(participant.rollback);
        participant.status = 'committed';
      }
 
      this.status = 'committed';
      return { success: true, transactionId: this.transactionId };
 
    } catch (error) {
      // Rollback all committed actions
      await this.rollback();
      this.status = 'rolled_back';
      return { success: false, error: error.message, transactionId: this.transactionId };
    }
  }
 
  async rollback() {
    for (const rollback of this.rollbackActions.reverse()) {
      try {
        await rollback();
      } catch (e) {
        console.error(`Rollback failed: ${e.message}`);
      }
    }
  }
}
 
// 2. Sync Orchestrator Function
async function orchestrateMultiSystemSync(items) {
  const syncData = items[0].json;
  const transactionId = `txn_${Date.now()}`;
  const transaction = new DistributedSyncTransaction(transactionId);
 
  // Define participants based on data type
  const systems = {
    crm: {
      async prepare() {
        // Validate CRM can accept update
        const response = await fetch(`${CRM_API}/validate`, {
          method: 'POST',
          body: JSON.stringify(syncData)
        });
        if (!response.ok) throw new Error('CRM validation failed');
      },
      async commit() {
        return fetch(`${CRM_API}/contacts/${syncData.id}`, {
          method: 'PUT',
          body: JSON.stringify(syncData)
        });
      }
    },
    erp: {
      async prepare() {
        // Check ERP availability
        const health = await fetch(`${ERP_API}/health`);
        if (!health.ok) throw new Error('ERP unavailable');
      },
      async commit() {
        return fetch(`${ERP_API}/customers/${syncData.external_id}`, {
          method: 'PATCH',
          body: JSON.stringify({
            name: syncData.full_name,
            email: syncData.email,
            phone: syncData.phone
          })
        });
      }
    },
    warehouse: {
      async prepare() {
        // Verify warehouse schema
        return true;
      },
      async commit() {
        return fetch(`${WAREHOUSE_API}/upsert`, {
          method: 'POST',
          body: JSON.stringify({
            table: 'contacts',
            data: syncData
          })
        });
      }
    }
  };
 
  // Add participants
  for (const [systemName, actions] of Object.entries(systems)) {
    if (syncData.targetSystems?.includes(systemName) || !syncData.targetSystems) {
      transaction.addParticipant(
        systemName,
        actions,
        async () => {
          // Rollback action
          await fetch(`${systems[systemName].apiUrl}/rollback`, {
            method: 'POST',
            body: JSON.stringify({ transactionId, entityId: syncData.id })
          });
        }
      );
    }
  }
 
  const result = await transaction.execute();
 
  return [{
    json: {
      ...syncData,
      transactionId,
      syncResult: result,
      participantStatuses: transaction.participants.map(p => ({
        system: p.system,
        status: p.status
      }))
    }
  }];
}
 
// 3. Consistency Checker Workflow
const consistencyChecker = {
  name: "Consistency Check Trigger",
  type: "n8n-nodes-base.scheduleTrigger",
  parameters: {
    rule: {
      interval: [{ field: "hours", hoursInterval: 6 }]
    }
  }
};
 
// 4. Compare Data Across Systems
async function compareSystemData(items) {
  const checkConfig = items[0].json;
  const inconsistencies = [];
 
  // Sample records to check
  const sampleIds = checkConfig.sampleIds || await getSampleIds(100);
 
  for (const id of sampleIds) {
    const records = {};
 
    // Fetch from each system
    for (const system of ['crm', 'erp', 'warehouse']) {
      try {
        records[system] = await fetchRecord(system, id);
      } catch (e) {
        records[system] = { error: e.message };
      }
    }
 
    // Compare critical fields
    const criticalFields = ['email', 'phone', 'company'];
    const fieldInconsistencies = [];
 
    for (const field of criticalFields) {
      const values = Object.entries(records)
        .filter(([_, r]) => !r.error)
        .map(([sys, r]) => ({ system: sys, value: r[field] }));
 
      const uniqueValues = [...new Set(values.map(v => v.value))];
 
      if (uniqueValues.length > 1) {
        fieldInconsistencies.push({
          field,
          values: values,
          recommendation: determineCorrectValue(values, field)
        });
      }
    }
 
    if (fieldInconsistencies.length > 0) {
      inconsistencies.push({
        entityId: id,
        inconsistencies: fieldInconsistencies,
        records
      });
    }
  }
 
  return [{
    json: {
      checkTime: new Date().toISOString(),
      samplesChecked: sampleIds.length,
      inconsistenciesFound: inconsistencies.length,
      inconsistencies,
      healthScore: ((sampleIds.length - inconsistencies.length) / sampleIds.length * 100).toFixed(2)
    }
  }];
}
 
function determineCorrectValue(values, field) {
  // Priority-based resolution
  const priorities = { crm: 1, erp: 2, warehouse: 3 };
  const sorted = values.sort((a, b) => priorities[a.system] - priorities[b.system]);
  return {
    recommendedValue: sorted[0].value,
    sourceSystem: sorted[0].system,
    reason: 'Based on system priority (CRM is source of truth)'
  };
}
 
// 5. Auto-Repair Inconsistencies
async function repairInconsistencies(items) {
  const report = items[0].json;
  const repairs = [];
 
  for (const issue of report.inconsistencies) {
    for (const fieldIssue of issue.inconsistencies) {
      const correctValue = fieldIssue.recommendation.recommendedValue;
      const sourceSystem = fieldIssue.recommendation.sourceSystem;
 
      // Update systems that have incorrect values
      for (const systemValue of fieldIssue.values) {
        if (systemValue.value !== correctValue && systemValue.system !== sourceSystem) {
          try {
            await updateSystemRecord(
              systemValue.system,
              issue.entityId,
              { [fieldIssue.field]: correctValue }
            );
 
            repairs.push({
              entityId: issue.entityId,
              field: fieldIssue.field,
              system: systemValue.system,
              oldValue: systemValue.value,
              newValue: correctValue,
              status: 'repaired'
            });
          } catch (e) {
            repairs.push({
              entityId: issue.entityId,
              field: fieldIssue.field,
              system: systemValue.system,
              status: 'failed',
              error: e.message
            });
          }
        }
      }
    }
  }
 
  return [{
    json: {
      repairTime: new Date().toISOString(),
      totalRepairs: repairs.length,
      successful: repairs.filter(r => r.status === 'repaired').length,
      failed: repairs.filter(r => r.status === 'failed').length,
      repairs
    }
  }];
}

Concluzie

Sincronizarea robusta a datelor necesita gestionarea atenta a conflictelor, actualizari incrementale si verificari de consistenta intre sisteme. Implementeaza sincronizare bidirectionala cu strategii de rezolvare a conflictelor, foloseste sincronizarea incrementala pentru seturi mari de date si mentine verificari de consistenta in toate sistemele. Flexibilitatea n8n permite construirea unor workflow-uri complexe de sincronizare care pastreaza datele consistente pe toate platformele.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.