Data synchronization between systems is critical for operational consistency. This guide covers building robust sync workflows with n8n handling conflicts, incremental updates, and multi-directional sync.
Bidirectional CRM Sync
Sync data between CRM and other systems:
// Bidirectional CRM <-> Database Sync Workflow
// 1. Webhook Trigger for Real-time Sync
const webhookTrigger = {
name: "CRM Webhook",
type: "n8n-nodes-base.webhook",
parameters: {
path: "crm-sync",
httpMethod: "POST",
responseMode: "onReceived"
}
};
// 2. Determine Sync Direction and Type
function determineSyncAction(items) {
const payload = items[0].json;
const syncAction = {
source: payload.source || 'crm',
event_type: payload.event_type,
entity_type: payload.entity_type,
entity_id: payload.entity_id,
timestamp: payload.timestamp || new Date().toISOString(),
data: payload.data,
action: 'unknown'
};
// Map event types to actions
const actionMap = {
'contact.created': 'create',
'contact.updated': 'update',
'contact.deleted': 'delete',
'deal.created': 'create',
'deal.updated': 'update',
'deal.stage_changed': 'update',
'company.created': 'create',
'company.updated': 'update'
};
syncAction.action = actionMap[payload.event_type] || 'unknown';
return [{ json: syncAction }];
}
// 3. Conflict Detection Node
const conflictDetection = {
name: "Check for Conflicts",
type: "n8n-nodes-base.function",
parameters: {
functionCode: `
const syncAction = items[0].json;
const entityId = syncAction.entity_id;
const sourceTimestamp = new Date(syncAction.timestamp);
// Get last sync state from database
const lastSync = await $getWorkflowStaticData('global');
const syncState = lastSync.syncState || {};
const entityState = syncState[entityId] || {};
// Check for conflict
let hasConflict = false;
let conflictType = null;
if (entityState.lastModified) {
const lastModified = new Date(entityState.lastModified);
const timeDiff = Math.abs(sourceTimestamp - lastModified);
// Conflict if modified within 5 minutes from different source
if (timeDiff < 5 * 60 * 1000 && entityState.lastSource !== syncAction.source) {
hasConflict = true;
conflictType = 'concurrent_modification';
}
}
// Check for version mismatch
if (syncAction.data.version && entityState.version) {
if (syncAction.data.version < entityState.version) {
hasConflict = true;
conflictType = 'version_mismatch';
}
}
return [{
json: {
...syncAction,
hasConflict,
conflictType,
previousState: entityState
}
}];
`
}
};
// 4. Conflict Resolution Strategy
function resolveConflict(items) {
const item = items[0].json;
if (!item.hasConflict) {
return [{ json: { ...item, resolution: 'no_conflict', proceed: true } }];
}
// Resolution strategies
const strategies = {
// Last write wins
'last_write_wins': () => {
const sourceTime = new Date(item.timestamp);
const prevTime = new Date(item.previousState.lastModified);
return sourceTime > prevTime;
},
// Source priority (CRM wins for contacts)
'source_priority': () => {
const priorities = { crm: 1, erp: 2, database: 3 };
return priorities[item.source] <= priorities[item.previousState.lastSource];
},
// Merge non-conflicting fields
'field_merge': () => {
const mergedData = { ...item.previousState.data };
for (const [key, value] of Object.entries(item.data)) {
// Only update fields that changed from original
if (item.previousState.data[key] !== value) {
mergedData[key] = value;
}
}
item.data = mergedData;
return true;
}
};
// Default strategy based on entity type
let strategy = 'last_write_wins';
if (item.entity_type === 'contact') {
strategy = 'source_priority';
} else if (item.entity_type === 'deal') {
strategy = 'field_merge';
}
const proceed = strategies[strategy]();
return [{
json: {
...item,
resolution: proceed ? `resolved_${strategy}` : 'blocked',
proceed: proceed
}
}];
}
// 5. Sync to Target Systems
const syncToDatabase = {
name: "Sync to Database",
type: "n8n-nodes-base.postgres",
parameters: {
operation: "upsert",
table: "={{$json.entity_type}}s",
columns: "external_id, data, source, updated_at, sync_version",
updateColumns: "data, source, updated_at, sync_version"
}
};
const syncBackToCRM = {
name: "Sync Back to CRM",
type: "n8n-nodes-base.hubspot",
parameters: {
resource: "contact",
operation: "update",
contactId: "={{$json.entity_id}}",
additionalFields: {
properties: "={{$json.data}}"
}
}
};
// 6. Update Sync State
function updateSyncState(items) {
const item = items[0].json;
// Update workflow static data
const staticData = $getWorkflowStaticData('global');
if (!staticData.syncState) {
staticData.syncState = {};
}
staticData.syncState[item.entity_id] = {
lastModified: item.timestamp,
lastSource: item.source,
version: (item.data.version || 0) + 1,
data: item.data
};
return [{
json: {
...item,
syncCompleted: true,
newVersion: staticData.syncState[item.entity_id].version
}
}];
}Incremental Data Sync
Handle large datasets with incremental sync:
// Incremental Sync Workflow for Large Datasets
// 1. Scheduled Incremental Sync
const scheduledSync = {
name: "Scheduled Sync",
type: "n8n-nodes-base.scheduleTrigger",
parameters: {
rule: {
interval: [{ field: "minutes", minutesInterval: 15 }]
}
}
};
// 2. Get Last Sync Checkpoint
const getCheckpoint = {
name: "Get Checkpoint",
type: "n8n-nodes-base.postgres",
parameters: {
operation: "executeQuery",
query: `
SELECT
last_sync_time,
last_sync_id,
sync_cursor,
records_synced
FROM sync_checkpoints
WHERE sync_name = 'crm_to_warehouse'
ORDER BY last_sync_time DESC
LIMIT 1
`
}
};
// 3. Fetch Changed Records
const fetchChangedRecords = {
name: "Fetch Changes from CRM",
type: "n8n-nodes-base.httpRequest",
parameters: {
method: "GET",
url: "={{$env.CRM_API_URL}}/contacts",
authentication: "oAuth2",
queryParameters: {
modified_after: "={{$json.last_sync_time || '1970-01-01T00:00:00Z'}}",
cursor: "={{$json.sync_cursor || ''}}",
limit: "100",
order_by: "modified_at"
}
}
};
// 4. Transform and Validate Data
function transformForWarehouse(items) {
const response = items[0].json;
const records = response.data || [];
const transformed = records.map(record => {
// Data transformation
const transformed = {
id: record.id,
external_id: record.external_id,
email: record.email?.toLowerCase().trim(),
first_name: record.first_name?.trim(),
last_name: record.last_name?.trim(),
full_name: `${record.first_name || ''} ${record.last_name || ''}`.trim(),
company: record.company_name,
phone: normalizePhone(record.phone),
created_at: record.created_at,
updated_at: record.updated_at,
source_system: 'crm',
sync_timestamp: new Date().toISOString()
};
// Validation
const validationErrors = [];
if (!transformed.email || !isValidEmail(transformed.email)) {
validationErrors.push('Invalid email');
}
if (!transformed.id) {
validationErrors.push('Missing ID');
}
return {
json: {
...transformed,
_valid: validationErrors.length === 0,
_errors: validationErrors,
_original: record
}
};
});
// Add pagination info
transformed.push({
json: {
_metadata: true,
hasMore: response.has_more,
nextCursor: response.next_cursor,
totalInBatch: records.length
}
});
return transformed;
}
function normalizePhone(phone) {
if (!phone) return null;
return phone.replace(/[^\d+]/g, '');
}
function isValidEmail(email) {
return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
}
// 5. Split Valid and Invalid Records
const splitRecords = {
name: "Split Valid/Invalid",
type: "n8n-nodes-base.if",
parameters: {
conditions: {
boolean: [
{
value1: "={{$json._valid}}",
value2: true
}
]
}
}
};
// 6. Batch Insert to Warehouse
const batchInsert = {
name: "Insert to Warehouse",
type: "n8n-nodes-base.postgres",
parameters: {
operation: "executeQuery",
query: `
INSERT INTO contacts_staging (
id, external_id, email, first_name, last_name,
full_name, company, phone, created_at, updated_at,
source_system, sync_timestamp
)
VALUES
{{$json.map(r => \`(
'\${r.id}', '\${r.external_id}', '\${r.email}',
'\${r.first_name}', '\${r.last_name}', '\${r.full_name}',
'\${r.company}', '\${r.phone}', '\${r.created_at}',
'\${r.updated_at}', '\${r.source_system}', '\${r.sync_timestamp}'
)\`).join(',')}}
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name,
full_name = EXCLUDED.full_name,
company = EXCLUDED.company,
phone = EXCLUDED.phone,
updated_at = EXCLUDED.updated_at,
sync_timestamp = EXCLUDED.sync_timestamp
`
}
};
// 7. Log Invalid Records for Review
const logInvalidRecords = {
name: "Log Invalid Records",
type: "n8n-nodes-base.postgres",
parameters: {
operation: "insert",
table: "sync_errors",
columns: "sync_name, record_id, error_type, error_details, raw_data, created_at"
}
};
// 8. Update Checkpoint
function updateCheckpoint(items) {
const validRecords = items.filter(i => !i.json._metadata && i.json._valid);
const metadata = items.find(i => i.json._metadata)?.json || {};
const checkpoint = {
sync_name: 'crm_to_warehouse',
last_sync_time: new Date().toISOString(),
last_sync_id: validRecords[validRecords.length - 1]?.json.id,
sync_cursor: metadata.nextCursor || null,
records_synced: validRecords.length,
has_more: metadata.hasMore || false
};
return [{ json: checkpoint }];
}
// 9. Continue if More Records
const checkMoreRecords = {
name: "Check for More",
type: "n8n-nodes-base.if",
parameters: {
conditions: {
boolean: [
{
value1: "={{$json.has_more}}",
value2: true
}
]
}
}
};
// Loop back to fetch more if needed
const loopBack = {
name: "Loop to Fetch More",
type: "n8n-nodes-base.executeWorkflow",
parameters: {
workflowId: "={{$workflow.id}}"
}
};Multi-System Data Consistency
Maintain consistency across multiple systems:
// Multi-System Consistency Workflow
// 1. Transaction Manager
class DistributedSyncTransaction {
constructor(transactionId) {
this.transactionId = transactionId;
this.participants = [];
this.status = 'pending';
this.rollbackActions = [];
}
addParticipant(system, action, rollback) {
this.participants.push({ system, action, rollback, status: 'pending' });
}
async execute() {
// Two-phase commit pattern
try {
// Phase 1: Prepare all participants
for (const participant of this.participants) {
participant.status = 'preparing';
await participant.action.prepare();
participant.status = 'prepared';
}
// Phase 2: Commit all participants
for (const participant of this.participants) {
participant.status = 'committing';
await participant.action.commit();
this.rollbackActions.push(participant.rollback);
participant.status = 'committed';
}
this.status = 'committed';
return { success: true, transactionId: this.transactionId };
} catch (error) {
// Rollback all committed actions
await this.rollback();
this.status = 'rolled_back';
return { success: false, error: error.message, transactionId: this.transactionId };
}
}
async rollback() {
for (const rollback of this.rollbackActions.reverse()) {
try {
await rollback();
} catch (e) {
console.error(`Rollback failed: ${e.message}`);
}
}
}
}
// 2. Sync Orchestrator Function
async function orchestrateMultiSystemSync(items) {
const syncData = items[0].json;
const transactionId = `txn_${Date.now()}`;
const transaction = new DistributedSyncTransaction(transactionId);
// Define participants based on data type
const systems = {
crm: {
async prepare() {
// Validate CRM can accept update
const response = await fetch(`${CRM_API}/validate`, {
method: 'POST',
body: JSON.stringify(syncData)
});
if (!response.ok) throw new Error('CRM validation failed');
},
async commit() {
return fetch(`${CRM_API}/contacts/${syncData.id}`, {
method: 'PUT',
body: JSON.stringify(syncData)
});
}
},
erp: {
async prepare() {
// Check ERP availability
const health = await fetch(`${ERP_API}/health`);
if (!health.ok) throw new Error('ERP unavailable');
},
async commit() {
return fetch(`${ERP_API}/customers/${syncData.external_id}`, {
method: 'PATCH',
body: JSON.stringify({
name: syncData.full_name,
email: syncData.email,
phone: syncData.phone
})
});
}
},
warehouse: {
async prepare() {
// Verify warehouse schema
return true;
},
async commit() {
return fetch(`${WAREHOUSE_API}/upsert`, {
method: 'POST',
body: JSON.stringify({
table: 'contacts',
data: syncData
})
});
}
}
};
// Add participants
for (const [systemName, actions] of Object.entries(systems)) {
if (syncData.targetSystems?.includes(systemName) || !syncData.targetSystems) {
transaction.addParticipant(
systemName,
actions,
async () => {
// Rollback action
await fetch(`${systems[systemName].apiUrl}/rollback`, {
method: 'POST',
body: JSON.stringify({ transactionId, entityId: syncData.id })
});
}
);
}
}
const result = await transaction.execute();
return [{
json: {
...syncData,
transactionId,
syncResult: result,
participantStatuses: transaction.participants.map(p => ({
system: p.system,
status: p.status
}))
}
}];
}
// 3. Consistency Checker Workflow
const consistencyChecker = {
name: "Consistency Check Trigger",
type: "n8n-nodes-base.scheduleTrigger",
parameters: {
rule: {
interval: [{ field: "hours", hoursInterval: 6 }]
}
}
};
// 4. Compare Data Across Systems
async function compareSystemData(items) {
const checkConfig = items[0].json;
const inconsistencies = [];
// Sample records to check
const sampleIds = checkConfig.sampleIds || await getSampleIds(100);
for (const id of sampleIds) {
const records = {};
// Fetch from each system
for (const system of ['crm', 'erp', 'warehouse']) {
try {
records[system] = await fetchRecord(system, id);
} catch (e) {
records[system] = { error: e.message };
}
}
// Compare critical fields
const criticalFields = ['email', 'phone', 'company'];
const fieldInconsistencies = [];
for (const field of criticalFields) {
const values = Object.entries(records)
.filter(([_, r]) => !r.error)
.map(([sys, r]) => ({ system: sys, value: r[field] }));
const uniqueValues = [...new Set(values.map(v => v.value))];
if (uniqueValues.length > 1) {
fieldInconsistencies.push({
field,
values: values,
recommendation: determineCorrectValue(values, field)
});
}
}
if (fieldInconsistencies.length > 0) {
inconsistencies.push({
entityId: id,
inconsistencies: fieldInconsistencies,
records
});
}
}
return [{
json: {
checkTime: new Date().toISOString(),
samplesChecked: sampleIds.length,
inconsistenciesFound: inconsistencies.length,
inconsistencies,
healthScore: ((sampleIds.length - inconsistencies.length) / sampleIds.length * 100).toFixed(2)
}
}];
}
function determineCorrectValue(values, field) {
// Priority-based resolution
const priorities = { crm: 1, erp: 2, warehouse: 3 };
const sorted = values.sort((a, b) => priorities[a.system] - priorities[b.system]);
return {
recommendedValue: sorted[0].value,
sourceSystem: sorted[0].system,
reason: 'Based on system priority (CRM is source of truth)'
};
}
// 5. Auto-Repair Inconsistencies
async function repairInconsistencies(items) {
const report = items[0].json;
const repairs = [];
for (const issue of report.inconsistencies) {
for (const fieldIssue of issue.inconsistencies) {
const correctValue = fieldIssue.recommendation.recommendedValue;
const sourceSystem = fieldIssue.recommendation.sourceSystem;
// Update systems that have incorrect values
for (const systemValue of fieldIssue.values) {
if (systemValue.value !== correctValue && systemValue.system !== sourceSystem) {
try {
await updateSystemRecord(
systemValue.system,
issue.entityId,
{ [fieldIssue.field]: correctValue }
);
repairs.push({
entityId: issue.entityId,
field: fieldIssue.field,
system: systemValue.system,
oldValue: systemValue.value,
newValue: correctValue,
status: 'repaired'
});
} catch (e) {
repairs.push({
entityId: issue.entityId,
field: fieldIssue.field,
system: systemValue.system,
status: 'failed',
error: e.message
});
}
}
}
}
}
return [{
json: {
repairTime: new Date().toISOString(),
totalRepairs: repairs.length,
successful: repairs.filter(r => r.status === 'repaired').length,
failed: repairs.filter(r => r.status === 'failed').length,
repairs
}
}];
}Conclusion
Robust data synchronization requires careful handling of conflicts, incremental updates, and multi-system consistency. Implement bidirectional sync with conflict resolution strategies, use incremental sync for large datasets, and maintain consistency checks across systems. n8n's flexibility allows building complex sync workflows that keep your data consistent across all platforms.