Sincronizarea datelor intre sisteme este esentiala pentru consistenta operationala. Acest ghid acopera construirea unor workflow-uri robuste de sincronizare cu n8n, care gestioneaza conflicte, actualizari incrementale si sincronizare multi-directionala.
Sincronizare bidirectionala CRM
Sincronizeaza datele intre CRM si alte sisteme:
// Bidirectional CRM <-> Database Sync Workflow
// 1. Webhook Trigger for Real-time Sync
const webhookTrigger = {
name: "CRM Webhook",
type: "n8n-nodes-base.webhook",
parameters: {
path: "crm-sync",
httpMethod: "POST",
responseMode: "onReceived"
}
};
// 2. Determine Sync Direction and Type
function determineSyncAction(items) {
const payload = items[0].json;
const syncAction = {
source: payload.source || 'crm',
event_type: payload.event_type,
entity_type: payload.entity_type,
entity_id: payload.entity_id,
timestamp: payload.timestamp || new Date().toISOString(),
data: payload.data,
action: 'unknown'
};
// Map event types to actions
const actionMap = {
'contact.created': 'create',
'contact.updated': 'update',
'contact.deleted': 'delete',
'deal.created': 'create',
'deal.updated': 'update',
'deal.stage_changed': 'update',
'company.created': 'create',
'company.updated': 'update'
};
syncAction.action = actionMap[payload.event_type] || 'unknown';
return [{ json: syncAction }];
}
// 3. Conflict Detection Node
const conflictDetection = {
name: "Check for Conflicts",
type: "n8n-nodes-base.function",
parameters: {
functionCode: `
const syncAction = items[0].json;
const entityId = syncAction.entity_id;
const sourceTimestamp = new Date(syncAction.timestamp);
// Get last sync state from database
const lastSync = await $getWorkflowStaticData('global');
const syncState = lastSync.syncState || {};
const entityState = syncState[entityId] || {};
// Check for conflict
let hasConflict = false;
let conflictType = null;
if (entityState.lastModified) {
const lastModified = new Date(entityState.lastModified);
const timeDiff = Math.abs(sourceTimestamp - lastModified);
// Conflict if modified within 5 minutes from different source
if (timeDiff < 5 * 60 * 1000 && entityState.lastSource !== syncAction.source) {
hasConflict = true;
conflictType = 'concurrent_modification';
}
}
// Check for version mismatch
if (syncAction.data.version && entityState.version) {
if (syncAction.data.version < entityState.version) {
hasConflict = true;
conflictType = 'version_mismatch';
}
}
return [{
json: {
...syncAction,
hasConflict,
conflictType,
previousState: entityState
}
}];
`
}
};
// 4. Conflict Resolution Strategy
function resolveConflict(items) {
const item = items[0].json;
if (!item.hasConflict) {
return [{ json: { ...item, resolution: 'no_conflict', proceed: true } }];
}
// Resolution strategies
const strategies = {
// Last write wins
'last_write_wins': () => {
const sourceTime = new Date(item.timestamp);
const prevTime = new Date(item.previousState.lastModified);
return sourceTime > prevTime;
},
// Source priority (CRM wins for contacts)
'source_priority': () => {
const priorities = { crm: 1, erp: 2, database: 3 };
return priorities[item.source] <= priorities[item.previousState.lastSource];
},
// Merge non-conflicting fields
'field_merge': () => {
const mergedData = { ...item.previousState.data };
for (const [key, value] of Object.entries(item.data)) {
// Only update fields that changed from original
if (item.previousState.data[key] !== value) {
mergedData[key] = value;
}
}
item.data = mergedData;
return true;
}
};
// Default strategy based on entity type
let strategy = 'last_write_wins';
if (item.entity_type === 'contact') {
strategy = 'source_priority';
} else if (item.entity_type === 'deal') {
strategy = 'field_merge';
}
const proceed = strategies[strategy]();
return [{
json: {
...item,
resolution: proceed ? `resolved_${strategy}` : 'blocked',
proceed: proceed
}
}];
}
// 5. Sync to Target Systems
const syncToDatabase = {
name: "Sync to Database",
type: "n8n-nodes-base.postgres",
parameters: {
operation: "upsert",
table: "={{$json.entity_type}}s",
columns: "external_id, data, source, updated_at, sync_version",
updateColumns: "data, source, updated_at, sync_version"
}
};
const syncBackToCRM = {
name: "Sync Back to CRM",
type: "n8n-nodes-base.hubspot",
parameters: {
resource: "contact",
operation: "update",
contactId: "={{$json.entity_id}}",
additionalFields: {
properties: "={{$json.data}}"
}
}
};
// 6. Update Sync State
function updateSyncState(items) {
const item = items[0].json;
// Update workflow static data
const staticData = $getWorkflowStaticData('global');
if (!staticData.syncState) {
staticData.syncState = {};
}
staticData.syncState[item.entity_id] = {
lastModified: item.timestamp,
lastSource: item.source,
version: (item.data.version || 0) + 1,
data: item.data
};
return [{
json: {
...item,
syncCompleted: true,
newVersion: staticData.syncState[item.entity_id].version
}
}];
}Sincronizare incrementala a datelor
Gestioneaza seturi mari de date cu sincronizare incrementala:
// Incremental Sync Workflow for Large Datasets
// 1. Scheduled Incremental Sync
const scheduledSync = {
name: "Scheduled Sync",
type: "n8n-nodes-base.scheduleTrigger",
parameters: {
rule: {
interval: [{ field: "minutes", minutesInterval: 15 }]
}
}
};
// 2. Get Last Sync Checkpoint
const getCheckpoint = {
name: "Get Checkpoint",
type: "n8n-nodes-base.postgres",
parameters: {
operation: "executeQuery",
query: `
SELECT
last_sync_time,
last_sync_id,
sync_cursor,
records_synced
FROM sync_checkpoints
WHERE sync_name = 'crm_to_warehouse'
ORDER BY last_sync_time DESC
LIMIT 1
`
}
};
// 3. Fetch Changed Records
const fetchChangedRecords = {
name: "Fetch Changes from CRM",
type: "n8n-nodes-base.httpRequest",
parameters: {
method: "GET",
url: "={{$env.CRM_API_URL}}/contacts",
authentication: "oAuth2",
queryParameters: {
modified_after: "={{$json.last_sync_time || '1970-01-01T00:00:00Z'}}",
cursor: "={{$json.sync_cursor || ''}}",
limit: "100",
order_by: "modified_at"
}
}
};
// 4. Transform and Validate Data
function transformForWarehouse(items) {
const response = items[0].json;
const records = response.data || [];
const transformed = records.map(record => {
// Data transformation
const transformed = {
id: record.id,
external_id: record.external_id,
email: record.email?.toLowerCase().trim(),
first_name: record.first_name?.trim(),
last_name: record.last_name?.trim(),
full_name: `${record.first_name || ''} ${record.last_name || ''}`.trim(),
company: record.company_name,
phone: normalizePhone(record.phone),
created_at: record.created_at,
updated_at: record.updated_at,
source_system: 'crm',
sync_timestamp: new Date().toISOString()
};
// Validation
const validationErrors = [];
if (!transformed.email || !isValidEmail(transformed.email)) {
validationErrors.push('Invalid email');
}
if (!transformed.id) {
validationErrors.push('Missing ID');
}
return {
json: {
...transformed,
_valid: validationErrors.length === 0,
_errors: validationErrors,
_original: record
}
};
});
// Add pagination info
transformed.push({
json: {
_metadata: true,
hasMore: response.has_more,
nextCursor: response.next_cursor,
totalInBatch: records.length
}
});
return transformed;
}
function normalizePhone(phone) {
if (!phone) return null;
return phone.replace(/[^\d+]/g, '');
}
function isValidEmail(email) {
return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
}
// 5. Split Valid and Invalid Records
const splitRecords = {
name: "Split Valid/Invalid",
type: "n8n-nodes-base.if",
parameters: {
conditions: {
boolean: [
{
value1: "={{$json._valid}}",
value2: true
}
]
}
}
};
// 6. Batch Insert to Warehouse
const batchInsert = {
name: "Insert to Warehouse",
type: "n8n-nodes-base.postgres",
parameters: {
operation: "executeQuery",
query: `
INSERT INTO contacts_staging (
id, external_id, email, first_name, last_name,
full_name, company, phone, created_at, updated_at,
source_system, sync_timestamp
)
VALUES
{{$json.map(r => \`(
'\${r.id}', '\${r.external_id}', '\${r.email}',
'\${r.first_name}', '\${r.last_name}', '\${r.full_name}',
'\${r.company}', '\${r.phone}', '\${r.created_at}',
'\${r.updated_at}', '\${r.source_system}', '\${r.sync_timestamp}'
)\`).join(',')}}
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name,
full_name = EXCLUDED.full_name,
company = EXCLUDED.company,
phone = EXCLUDED.phone,
updated_at = EXCLUDED.updated_at,
sync_timestamp = EXCLUDED.sync_timestamp
`
}
};
// 7. Log Invalid Records for Review
const logInvalidRecords = {
name: "Log Invalid Records",
type: "n8n-nodes-base.postgres",
parameters: {
operation: "insert",
table: "sync_errors",
columns: "sync_name, record_id, error_type, error_details, raw_data, created_at"
}
};
// 8. Update Checkpoint
function updateCheckpoint(items) {
const validRecords = items.filter(i => !i.json._metadata && i.json._valid);
const metadata = items.find(i => i.json._metadata)?.json || {};
const checkpoint = {
sync_name: 'crm_to_warehouse',
last_sync_time: new Date().toISOString(),
last_sync_id: validRecords[validRecords.length - 1]?.json.id,
sync_cursor: metadata.nextCursor || null,
records_synced: validRecords.length,
has_more: metadata.hasMore || false
};
return [{ json: checkpoint }];
}
// 9. Continue if More Records
const checkMoreRecords = {
name: "Check for More",
type: "n8n-nodes-base.if",
parameters: {
conditions: {
boolean: [
{
value1: "={{$json.has_more}}",
value2: true
}
]
}
}
};
// Loop back to fetch more if needed
const loopBack = {
name: "Loop to Fetch More",
type: "n8n-nodes-base.executeWorkflow",
parameters: {
workflowId: "={{$workflow.id}}"
}
};Consistenta datelor in mai multe sisteme
Mentine consistenta datelor in mai multe sisteme simultan:
// Multi-System Consistency Workflow
// 1. Transaction Manager
class DistributedSyncTransaction {
constructor(transactionId) {
this.transactionId = transactionId;
this.participants = [];
this.status = 'pending';
this.rollbackActions = [];
}
addParticipant(system, action, rollback) {
this.participants.push({ system, action, rollback, status: 'pending' });
}
async execute() {
// Two-phase commit pattern
try {
// Phase 1: Prepare all participants
for (const participant of this.participants) {
participant.status = 'preparing';
await participant.action.prepare();
participant.status = 'prepared';
}
// Phase 2: Commit all participants
for (const participant of this.participants) {
participant.status = 'committing';
await participant.action.commit();
this.rollbackActions.push(participant.rollback);
participant.status = 'committed';
}
this.status = 'committed';
return { success: true, transactionId: this.transactionId };
} catch (error) {
// Rollback all committed actions
await this.rollback();
this.status = 'rolled_back';
return { success: false, error: error.message, transactionId: this.transactionId };
}
}
async rollback() {
for (const rollback of this.rollbackActions.reverse()) {
try {
await rollback();
} catch (e) {
console.error(`Rollback failed: ${e.message}`);
}
}
}
}
// 2. Sync Orchestrator Function
async function orchestrateMultiSystemSync(items) {
const syncData = items[0].json;
const transactionId = `txn_${Date.now()}`;
const transaction = new DistributedSyncTransaction(transactionId);
// Define participants based on data type
const systems = {
crm: {
async prepare() {
// Validate CRM can accept update
const response = await fetch(`${CRM_API}/validate`, {
method: 'POST',
body: JSON.stringify(syncData)
});
if (!response.ok) throw new Error('CRM validation failed');
},
async commit() {
return fetch(`${CRM_API}/contacts/${syncData.id}`, {
method: 'PUT',
body: JSON.stringify(syncData)
});
}
},
erp: {
async prepare() {
// Check ERP availability
const health = await fetch(`${ERP_API}/health`);
if (!health.ok) throw new Error('ERP unavailable');
},
async commit() {
return fetch(`${ERP_API}/customers/${syncData.external_id}`, {
method: 'PATCH',
body: JSON.stringify({
name: syncData.full_name,
email: syncData.email,
phone: syncData.phone
})
});
}
},
warehouse: {
async prepare() {
// Verify warehouse schema
return true;
},
async commit() {
return fetch(`${WAREHOUSE_API}/upsert`, {
method: 'POST',
body: JSON.stringify({
table: 'contacts',
data: syncData
})
});
}
}
};
// Add participants
for (const [systemName, actions] of Object.entries(systems)) {
if (syncData.targetSystems?.includes(systemName) || !syncData.targetSystems) {
transaction.addParticipant(
systemName,
actions,
async () => {
// Rollback action
await fetch(`${systems[systemName].apiUrl}/rollback`, {
method: 'POST',
body: JSON.stringify({ transactionId, entityId: syncData.id })
});
}
);
}
}
const result = await transaction.execute();
return [{
json: {
...syncData,
transactionId,
syncResult: result,
participantStatuses: transaction.participants.map(p => ({
system: p.system,
status: p.status
}))
}
}];
}
// 3. Consistency Checker Workflow
const consistencyChecker = {
name: "Consistency Check Trigger",
type: "n8n-nodes-base.scheduleTrigger",
parameters: {
rule: {
interval: [{ field: "hours", hoursInterval: 6 }]
}
}
};
// 4. Compare Data Across Systems
async function compareSystemData(items) {
const checkConfig = items[0].json;
const inconsistencies = [];
// Sample records to check
const sampleIds = checkConfig.sampleIds || await getSampleIds(100);
for (const id of sampleIds) {
const records = {};
// Fetch from each system
for (const system of ['crm', 'erp', 'warehouse']) {
try {
records[system] = await fetchRecord(system, id);
} catch (e) {
records[system] = { error: e.message };
}
}
// Compare critical fields
const criticalFields = ['email', 'phone', 'company'];
const fieldInconsistencies = [];
for (const field of criticalFields) {
const values = Object.entries(records)
.filter(([_, r]) => !r.error)
.map(([sys, r]) => ({ system: sys, value: r[field] }));
const uniqueValues = [...new Set(values.map(v => v.value))];
if (uniqueValues.length > 1) {
fieldInconsistencies.push({
field,
values: values,
recommendation: determineCorrectValue(values, field)
});
}
}
if (fieldInconsistencies.length > 0) {
inconsistencies.push({
entityId: id,
inconsistencies: fieldInconsistencies,
records
});
}
}
return [{
json: {
checkTime: new Date().toISOString(),
samplesChecked: sampleIds.length,
inconsistenciesFound: inconsistencies.length,
inconsistencies,
healthScore: ((sampleIds.length - inconsistencies.length) / sampleIds.length * 100).toFixed(2)
}
}];
}
function determineCorrectValue(values, field) {
// Priority-based resolution
const priorities = { crm: 1, erp: 2, warehouse: 3 };
const sorted = values.sort((a, b) => priorities[a.system] - priorities[b.system]);
return {
recommendedValue: sorted[0].value,
sourceSystem: sorted[0].system,
reason: 'Based on system priority (CRM is source of truth)'
};
}
// 5. Auto-Repair Inconsistencies
async function repairInconsistencies(items) {
const report = items[0].json;
const repairs = [];
for (const issue of report.inconsistencies) {
for (const fieldIssue of issue.inconsistencies) {
const correctValue = fieldIssue.recommendation.recommendedValue;
const sourceSystem = fieldIssue.recommendation.sourceSystem;
// Update systems that have incorrect values
for (const systemValue of fieldIssue.values) {
if (systemValue.value !== correctValue && systemValue.system !== sourceSystem) {
try {
await updateSystemRecord(
systemValue.system,
issue.entityId,
{ [fieldIssue.field]: correctValue }
);
repairs.push({
entityId: issue.entityId,
field: fieldIssue.field,
system: systemValue.system,
oldValue: systemValue.value,
newValue: correctValue,
status: 'repaired'
});
} catch (e) {
repairs.push({
entityId: issue.entityId,
field: fieldIssue.field,
system: systemValue.system,
status: 'failed',
error: e.message
});
}
}
}
}
}
return [{
json: {
repairTime: new Date().toISOString(),
totalRepairs: repairs.length,
successful: repairs.filter(r => r.status === 'repaired').length,
failed: repairs.filter(r => r.status === 'failed').length,
repairs
}
}];
}Concluzie
Sincronizarea robusta a datelor necesita gestionarea atenta a conflictelor, actualizari incrementale si verificari de consistenta intre sisteme. Implementeaza sincronizare bidirectionala cu strategii de rezolvare a conflictelor, foloseste sincronizarea incrementala pentru seturi mari de date si mentine verificari de consistenta in toate sistemele. Flexibilitatea n8n permite construirea unor workflow-uri complexe de sincronizare care pastreaza datele consistente pe toate platformele.