n8n Automation

Data Synchronization Workflows with n8n

DeviDevs Team
10 min read
#n8n#data-sync#automation#integration#etl

Data synchronization between systems is critical for operational consistency. This guide covers building robust sync workflows with n8n handling conflicts, incremental updates, and multi-directional sync.

Bidirectional CRM Sync

Sync data between CRM and other systems:

// Bidirectional CRM <-> Database Sync Workflow
 
// 1. Webhook Trigger for Real-time Sync
const webhookTrigger = {
  name: "CRM Webhook",
  type: "n8n-nodes-base.webhook",
  parameters: {
    path: "crm-sync",
    httpMethod: "POST",
    responseMode: "onReceived"
  }
};
 
// 2. Determine Sync Direction and Type
function determineSyncAction(items) {
  const payload = items[0].json;
 
  const syncAction = {
    source: payload.source || 'crm',
    event_type: payload.event_type,
    entity_type: payload.entity_type,
    entity_id: payload.entity_id,
    timestamp: payload.timestamp || new Date().toISOString(),
    data: payload.data,
    action: 'unknown'
  };
 
  // Map event types to actions
  const actionMap = {
    'contact.created': 'create',
    'contact.updated': 'update',
    'contact.deleted': 'delete',
    'deal.created': 'create',
    'deal.updated': 'update',
    'deal.stage_changed': 'update',
    'company.created': 'create',
    'company.updated': 'update'
  };
 
  syncAction.action = actionMap[payload.event_type] || 'unknown';
 
  return [{ json: syncAction }];
}
 
// 3. Conflict Detection Node
const conflictDetection = {
  name: "Check for Conflicts",
  type: "n8n-nodes-base.function",
  parameters: {
    functionCode: `
      const syncAction = items[0].json;
      const entityId = syncAction.entity_id;
      const sourceTimestamp = new Date(syncAction.timestamp);
 
      // Get last sync state from database
      const lastSync = await $getWorkflowStaticData('global');
      const syncState = lastSync.syncState || {};
      const entityState = syncState[entityId] || {};
 
      // Check for conflict
      let hasConflict = false;
      let conflictType = null;
 
      if (entityState.lastModified) {
        const lastModified = new Date(entityState.lastModified);
        const timeDiff = Math.abs(sourceTimestamp - lastModified);
 
        // Conflict if modified within 5 minutes from different source
        if (timeDiff < 5 * 60 * 1000 && entityState.lastSource !== syncAction.source) {
          hasConflict = true;
          conflictType = 'concurrent_modification';
        }
      }
 
      // Check for version mismatch
      if (syncAction.data.version && entityState.version) {
        if (syncAction.data.version < entityState.version) {
          hasConflict = true;
          conflictType = 'version_mismatch';
        }
      }
 
      return [{
        json: {
          ...syncAction,
          hasConflict,
          conflictType,
          previousState: entityState
        }
      }];
    `
  }
};
 
// 4. Conflict Resolution Strategy
function resolveConflict(items) {
  const item = items[0].json;
 
  if (!item.hasConflict) {
    return [{ json: { ...item, resolution: 'no_conflict', proceed: true } }];
  }
 
  // Resolution strategies
  const strategies = {
    // Last write wins
    'last_write_wins': () => {
      const sourceTime = new Date(item.timestamp);
      const prevTime = new Date(item.previousState.lastModified);
      return sourceTime > prevTime;
    },
 
    // Source priority (CRM wins for contacts)
    'source_priority': () => {
      const priorities = { crm: 1, erp: 2, database: 3 };
      return priorities[item.source] <= priorities[item.previousState.lastSource];
    },
 
    // Merge non-conflicting fields
    'field_merge': () => {
      const mergedData = { ...item.previousState.data };
      for (const [key, value] of Object.entries(item.data)) {
        // Only update fields that changed from original
        if (item.previousState.data[key] !== value) {
          mergedData[key] = value;
        }
      }
      item.data = mergedData;
      return true;
    }
  };
 
  // Default strategy based on entity type
  let strategy = 'last_write_wins';
  if (item.entity_type === 'contact') {
    strategy = 'source_priority';
  } else if (item.entity_type === 'deal') {
    strategy = 'field_merge';
  }
 
  const proceed = strategies[strategy]();
 
  return [{
    json: {
      ...item,
      resolution: proceed ? `resolved_${strategy}` : 'blocked',
      proceed: proceed
    }
  }];
}
 
// 5. Sync to Target Systems
const syncToDatabase = {
  name: "Sync to Database",
  type: "n8n-nodes-base.postgres",
  parameters: {
    operation: "upsert",
    table: "={{$json.entity_type}}s",
    columns: "external_id, data, source, updated_at, sync_version",
    updateColumns: "data, source, updated_at, sync_version"
  }
};
 
const syncBackToCRM = {
  name: "Sync Back to CRM",
  type: "n8n-nodes-base.hubspot",
  parameters: {
    resource: "contact",
    operation: "update",
    contactId: "={{$json.entity_id}}",
    additionalFields: {
      properties: "={{$json.data}}"
    }
  }
};
 
// 6. Update Sync State
function updateSyncState(items) {
  const item = items[0].json;
 
  // Update workflow static data
  const staticData = $getWorkflowStaticData('global');
  if (!staticData.syncState) {
    staticData.syncState = {};
  }
 
  staticData.syncState[item.entity_id] = {
    lastModified: item.timestamp,
    lastSource: item.source,
    version: (item.data.version || 0) + 1,
    data: item.data
  };
 
  return [{
    json: {
      ...item,
      syncCompleted: true,
      newVersion: staticData.syncState[item.entity_id].version
    }
  }];
}

Incremental Data Sync

Handle large datasets with incremental sync:

// Incremental Sync Workflow for Large Datasets
 
// 1. Scheduled Incremental Sync
const scheduledSync = {
  name: "Scheduled Sync",
  type: "n8n-nodes-base.scheduleTrigger",
  parameters: {
    rule: {
      interval: [{ field: "minutes", minutesInterval: 15 }]
    }
  }
};
 
// 2. Get Last Sync Checkpoint
const getCheckpoint = {
  name: "Get Checkpoint",
  type: "n8n-nodes-base.postgres",
  parameters: {
    operation: "executeQuery",
    query: `
      SELECT
        last_sync_time,
        last_sync_id,
        sync_cursor,
        records_synced
      FROM sync_checkpoints
      WHERE sync_name = 'crm_to_warehouse'
      ORDER BY last_sync_time DESC
      LIMIT 1
    `
  }
};
 
// 3. Fetch Changed Records
const fetchChangedRecords = {
  name: "Fetch Changes from CRM",
  type: "n8n-nodes-base.httpRequest",
  parameters: {
    method: "GET",
    url: "={{$env.CRM_API_URL}}/contacts",
    authentication: "oAuth2",
    queryParameters: {
      modified_after: "={{$json.last_sync_time || '1970-01-01T00:00:00Z'}}",
      cursor: "={{$json.sync_cursor || ''}}",
      limit: "100",
      order_by: "modified_at"
    }
  }
};
 
// 4. Transform and Validate Data
function transformForWarehouse(items) {
  const response = items[0].json;
  const records = response.data || [];
 
  const transformed = records.map(record => {
    // Data transformation
    const transformed = {
      id: record.id,
      external_id: record.external_id,
      email: record.email?.toLowerCase().trim(),
      first_name: record.first_name?.trim(),
      last_name: record.last_name?.trim(),
      full_name: `${record.first_name || ''} ${record.last_name || ''}`.trim(),
      company: record.company_name,
      phone: normalizePhone(record.phone),
      created_at: record.created_at,
      updated_at: record.updated_at,
      source_system: 'crm',
      sync_timestamp: new Date().toISOString()
    };
 
    // Validation
    const validationErrors = [];
    if (!transformed.email || !isValidEmail(transformed.email)) {
      validationErrors.push('Invalid email');
    }
    if (!transformed.id) {
      validationErrors.push('Missing ID');
    }
 
    return {
      json: {
        ...transformed,
        _valid: validationErrors.length === 0,
        _errors: validationErrors,
        _original: record
      }
    };
  });
 
  // Add pagination info
  transformed.push({
    json: {
      _metadata: true,
      hasMore: response.has_more,
      nextCursor: response.next_cursor,
      totalInBatch: records.length
    }
  });
 
  return transformed;
}
 
function normalizePhone(phone) {
  if (!phone) return null;
  return phone.replace(/[^\d+]/g, '');
}
 
function isValidEmail(email) {
  return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
}
 
// 5. Split Valid and Invalid Records
const splitRecords = {
  name: "Split Valid/Invalid",
  type: "n8n-nodes-base.if",
  parameters: {
    conditions: {
      boolean: [
        {
          value1: "={{$json._valid}}",
          value2: true
        }
      ]
    }
  }
};
 
// 6. Batch Insert to Warehouse
const batchInsert = {
  name: "Insert to Warehouse",
  type: "n8n-nodes-base.postgres",
  parameters: {
    operation: "executeQuery",
    query: `
      INSERT INTO contacts_staging (
        id, external_id, email, first_name, last_name,
        full_name, company, phone, created_at, updated_at,
        source_system, sync_timestamp
      )
      VALUES
      {{$json.map(r => \`(
        '\${r.id}', '\${r.external_id}', '\${r.email}',
        '\${r.first_name}', '\${r.last_name}', '\${r.full_name}',
        '\${r.company}', '\${r.phone}', '\${r.created_at}',
        '\${r.updated_at}', '\${r.source_system}', '\${r.sync_timestamp}'
      )\`).join(',')}}
      ON CONFLICT (id) DO UPDATE SET
        email = EXCLUDED.email,
        first_name = EXCLUDED.first_name,
        last_name = EXCLUDED.last_name,
        full_name = EXCLUDED.full_name,
        company = EXCLUDED.company,
        phone = EXCLUDED.phone,
        updated_at = EXCLUDED.updated_at,
        sync_timestamp = EXCLUDED.sync_timestamp
    `
  }
};
 
// 7. Log Invalid Records for Review
const logInvalidRecords = {
  name: "Log Invalid Records",
  type: "n8n-nodes-base.postgres",
  parameters: {
    operation: "insert",
    table: "sync_errors",
    columns: "sync_name, record_id, error_type, error_details, raw_data, created_at"
  }
};
 
// 8. Update Checkpoint
function updateCheckpoint(items) {
  const validRecords = items.filter(i => !i.json._metadata && i.json._valid);
  const metadata = items.find(i => i.json._metadata)?.json || {};
 
  const checkpoint = {
    sync_name: 'crm_to_warehouse',
    last_sync_time: new Date().toISOString(),
    last_sync_id: validRecords[validRecords.length - 1]?.json.id,
    sync_cursor: metadata.nextCursor || null,
    records_synced: validRecords.length,
    has_more: metadata.hasMore || false
  };
 
  return [{ json: checkpoint }];
}
 
// 9. Continue if More Records
const checkMoreRecords = {
  name: "Check for More",
  type: "n8n-nodes-base.if",
  parameters: {
    conditions: {
      boolean: [
        {
          value1: "={{$json.has_more}}",
          value2: true
        }
      ]
    }
  }
};
 
// Loop back to fetch more if needed
const loopBack = {
  name: "Loop to Fetch More",
  type: "n8n-nodes-base.executeWorkflow",
  parameters: {
    workflowId: "={{$workflow.id}}"
  }
};

Multi-System Data Consistency

Maintain consistency across multiple systems:

// Multi-System Consistency Workflow
 
// 1. Transaction Manager
class DistributedSyncTransaction {
  constructor(transactionId) {
    this.transactionId = transactionId;
    this.participants = [];
    this.status = 'pending';
    this.rollbackActions = [];
  }
 
  addParticipant(system, action, rollback) {
    this.participants.push({ system, action, rollback, status: 'pending' });
  }
 
  async execute() {
    // Two-phase commit pattern
    try {
      // Phase 1: Prepare all participants
      for (const participant of this.participants) {
        participant.status = 'preparing';
        await participant.action.prepare();
        participant.status = 'prepared';
      }
 
      // Phase 2: Commit all participants
      for (const participant of this.participants) {
        participant.status = 'committing';
        await participant.action.commit();
        this.rollbackActions.push(participant.rollback);
        participant.status = 'committed';
      }
 
      this.status = 'committed';
      return { success: true, transactionId: this.transactionId };
 
    } catch (error) {
      // Rollback all committed actions
      await this.rollback();
      this.status = 'rolled_back';
      return { success: false, error: error.message, transactionId: this.transactionId };
    }
  }
 
  async rollback() {
    for (const rollback of this.rollbackActions.reverse()) {
      try {
        await rollback();
      } catch (e) {
        console.error(`Rollback failed: ${e.message}`);
      }
    }
  }
}
 
// 2. Sync Orchestrator Function
async function orchestrateMultiSystemSync(items) {
  const syncData = items[0].json;
  const transactionId = `txn_${Date.now()}`;
  const transaction = new DistributedSyncTransaction(transactionId);
 
  // Define participants based on data type
  const systems = {
    crm: {
      async prepare() {
        // Validate CRM can accept update
        const response = await fetch(`${CRM_API}/validate`, {
          method: 'POST',
          body: JSON.stringify(syncData)
        });
        if (!response.ok) throw new Error('CRM validation failed');
      },
      async commit() {
        return fetch(`${CRM_API}/contacts/${syncData.id}`, {
          method: 'PUT',
          body: JSON.stringify(syncData)
        });
      }
    },
    erp: {
      async prepare() {
        // Check ERP availability
        const health = await fetch(`${ERP_API}/health`);
        if (!health.ok) throw new Error('ERP unavailable');
      },
      async commit() {
        return fetch(`${ERP_API}/customers/${syncData.external_id}`, {
          method: 'PATCH',
          body: JSON.stringify({
            name: syncData.full_name,
            email: syncData.email,
            phone: syncData.phone
          })
        });
      }
    },
    warehouse: {
      async prepare() {
        // Verify warehouse schema
        return true;
      },
      async commit() {
        return fetch(`${WAREHOUSE_API}/upsert`, {
          method: 'POST',
          body: JSON.stringify({
            table: 'contacts',
            data: syncData
          })
        });
      }
    }
  };
 
  // Add participants
  for (const [systemName, actions] of Object.entries(systems)) {
    if (syncData.targetSystems?.includes(systemName) || !syncData.targetSystems) {
      transaction.addParticipant(
        systemName,
        actions,
        async () => {
          // Rollback action
          await fetch(`${systems[systemName].apiUrl}/rollback`, {
            method: 'POST',
            body: JSON.stringify({ transactionId, entityId: syncData.id })
          });
        }
      );
    }
  }
 
  const result = await transaction.execute();
 
  return [{
    json: {
      ...syncData,
      transactionId,
      syncResult: result,
      participantStatuses: transaction.participants.map(p => ({
        system: p.system,
        status: p.status
      }))
    }
  }];
}
 
// 3. Consistency Checker Workflow
const consistencyChecker = {
  name: "Consistency Check Trigger",
  type: "n8n-nodes-base.scheduleTrigger",
  parameters: {
    rule: {
      interval: [{ field: "hours", hoursInterval: 6 }]
    }
  }
};
 
// 4. Compare Data Across Systems
async function compareSystemData(items) {
  const checkConfig = items[0].json;
  const inconsistencies = [];
 
  // Sample records to check
  const sampleIds = checkConfig.sampleIds || await getSampleIds(100);
 
  for (const id of sampleIds) {
    const records = {};
 
    // Fetch from each system
    for (const system of ['crm', 'erp', 'warehouse']) {
      try {
        records[system] = await fetchRecord(system, id);
      } catch (e) {
        records[system] = { error: e.message };
      }
    }
 
    // Compare critical fields
    const criticalFields = ['email', 'phone', 'company'];
    const fieldInconsistencies = [];
 
    for (const field of criticalFields) {
      const values = Object.entries(records)
        .filter(([_, r]) => !r.error)
        .map(([sys, r]) => ({ system: sys, value: r[field] }));
 
      const uniqueValues = [...new Set(values.map(v => v.value))];
 
      if (uniqueValues.length > 1) {
        fieldInconsistencies.push({
          field,
          values: values,
          recommendation: determineCorrectValue(values, field)
        });
      }
    }
 
    if (fieldInconsistencies.length > 0) {
      inconsistencies.push({
        entityId: id,
        inconsistencies: fieldInconsistencies,
        records
      });
    }
  }
 
  return [{
    json: {
      checkTime: new Date().toISOString(),
      samplesChecked: sampleIds.length,
      inconsistenciesFound: inconsistencies.length,
      inconsistencies,
      healthScore: ((sampleIds.length - inconsistencies.length) / sampleIds.length * 100).toFixed(2)
    }
  }];
}
 
function determineCorrectValue(values, field) {
  // Priority-based resolution
  const priorities = { crm: 1, erp: 2, warehouse: 3 };
  const sorted = values.sort((a, b) => priorities[a.system] - priorities[b.system]);
  return {
    recommendedValue: sorted[0].value,
    sourceSystem: sorted[0].system,
    reason: 'Based on system priority (CRM is source of truth)'
  };
}
 
// 5. Auto-Repair Inconsistencies
async function repairInconsistencies(items) {
  const report = items[0].json;
  const repairs = [];
 
  for (const issue of report.inconsistencies) {
    for (const fieldIssue of issue.inconsistencies) {
      const correctValue = fieldIssue.recommendation.recommendedValue;
      const sourceSystem = fieldIssue.recommendation.sourceSystem;
 
      // Update systems that have incorrect values
      for (const systemValue of fieldIssue.values) {
        if (systemValue.value !== correctValue && systemValue.system !== sourceSystem) {
          try {
            await updateSystemRecord(
              systemValue.system,
              issue.entityId,
              { [fieldIssue.field]: correctValue }
            );
 
            repairs.push({
              entityId: issue.entityId,
              field: fieldIssue.field,
              system: systemValue.system,
              oldValue: systemValue.value,
              newValue: correctValue,
              status: 'repaired'
            });
          } catch (e) {
            repairs.push({
              entityId: issue.entityId,
              field: fieldIssue.field,
              system: systemValue.system,
              status: 'failed',
              error: e.message
            });
          }
        }
      }
    }
  }
 
  return [{
    json: {
      repairTime: new Date().toISOString(),
      totalRepairs: repairs.length,
      successful: repairs.filter(r => r.status === 'repaired').length,
      failed: repairs.filter(r => r.status === 'failed').length,
      repairs
    }
  }];
}

Conclusion

Robust data synchronization requires careful handling of conflicts, incremental updates, and multi-system consistency. Implement bidirectional sync with conflict resolution strategies, use incremental sync for large datasets, and maintain consistency checks across systems. n8n's flexibility allows building complex sync workflows that keep your data consistent across all platforms.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.