n8n Automation

n8n Data Transformation and ETL Workflows: Building Data Pipelines

Petru Constantin
--18 min lectura
#n8n#ETL#data transformation#data pipelines#automation

n8n Data Transformation and ETL Workflows: Building Data Pipelines

Data transformation and ETL (Extract, Transform, Load) are critical for data-driven operations. This guide shows how to build comprehensive data pipelines with n8n.

Data Extraction Patterns

Multi-Source Data Extraction

// n8n Function Node - Data Extraction Controller
const extractionConfig = $input.first().json.config;
 
// Define extraction sources
const sources = {
  database: async (config) => {
    // Extracted via PostgreSQL/MySQL node
    return {
      type: 'database',
      query: config.query,
      connection: config.connectionName
    };
  },
 
  api: async (config) => {
    // Extracted via HTTP Request node
    return {
      type: 'api',
      endpoint: config.endpoint,
      method: config.method || 'GET',
      headers: config.headers || {},
      pagination: config.pagination
    };
  },
 
  file: async (config) => {
    // Extracted via Read Binary File / FTP node
    return {
      type: 'file',
      path: config.path,
      format: config.format, // csv, json, xlsx
      encoding: config.encoding || 'utf-8'
    };
  },
 
  spreadsheet: async (config) => {
    // Extracted via Google Sheets / Excel node
    return {
      type: 'spreadsheet',
      sheetId: config.sheetId,
      range: config.range,
      hasHeaders: config.hasHeaders !== false
    };
  }
};
 
// Generate extraction tasks based on config
const extractionTasks = extractionConfig.sources.map(source => ({
  sourceId: source.id,
  sourceType: source.type,
  config: sources[source.type](source.config),
  priority: source.priority || 5,
  retryPolicy: source.retryPolicy || { maxRetries: 3, backoff: 'exponential' }
}));
 
// Sort by priority
extractionTasks.sort((a, b) => b.priority - a.priority);
 
return extractionTasks.map(task => ({ json: task }));

Paginated API Extraction

// n8n Function Node - Paginated API Extractor
const apiConfig = $input.first().json;
const existingData = $('Previous Page Data').all().map(i => i.json);
 
// Pagination state
const pagination = {
  currentPage: apiConfig.currentPage || 1,
  pageSize: apiConfig.pageSize || 100,
  totalPages: apiConfig.totalPages || null,
  cursor: apiConfig.cursor || null,
  hasMore: true
};
 
// Determine pagination type
const paginationType = apiConfig.paginationType || 'offset';
 
// Build request parameters
let requestParams = {};
 
switch (paginationType) {
  case 'offset':
    requestParams = {
      offset: (pagination.currentPage - 1) * pagination.pageSize,
      limit: pagination.pageSize
    };
    break;
 
  case 'page':
    requestParams = {
      page: pagination.currentPage,
      per_page: pagination.pageSize
    };
    break;
 
  case 'cursor':
    requestParams = pagination.cursor
      ? { cursor: pagination.cursor, limit: pagination.pageSize }
      : { limit: pagination.pageSize };
    break;
 
  case 'link':
    // For APIs that return next page URL in headers/body
    requestParams = {
      nextUrl: apiConfig.nextUrl || apiConfig.baseUrl
    };
    break;
}
 
// Merge with any custom parameters
requestParams = {
  ...requestParams,
  ...apiConfig.additionalParams
};
 
return {
  json: {
    requestConfig: {
      url: apiConfig.baseUrl,
      method: 'GET',
      params: requestParams,
      headers: apiConfig.headers
    },
    pagination,
    paginationType,
    extractedCount: existingData.length,
    batchId: apiConfig.batchId || `batch_${Date.now()}`
  }
};

Incremental Extraction

// n8n Function Node - Incremental Extraction Manager
const config = $input.first().json;
const lastSync = $('Get Last Sync State').first().json;
 
// Determine extraction window
const now = new Date();
const lastSyncTime = lastSync?.lastSyncTime
  ? new Date(lastSync.lastSyncTime)
  : new Date(0); // Full extraction if no previous sync
 
// Build incremental query
const buildIncrementalQuery = (table, timestampColumn, lastSync) => {
  const whereClause = lastSync
    ? `WHERE ${timestampColumn} > '${lastSync.toISOString()}'`
    : '';
 
  return `
    SELECT *
    FROM ${table}
    ${whereClause}
    ORDER BY ${timestampColumn} ASC
    LIMIT 10000
  `;
};
 
// Build change detection query (for tables without timestamps)
const buildChangeDetectionQuery = (table, primaryKey, checksumColumns) => {
  return `
    SELECT
      ${primaryKey},
      MD5(CONCAT(${checksumColumns.join(', ')})) as row_checksum
    FROM ${table}
  `;
};
 
// Extraction strategy
let extractionStrategy;
if (config.incrementalColumn) {
  extractionStrategy = {
    type: 'incremental',
    query: buildIncrementalQuery(
      config.table,
      config.incrementalColumn,
      lastSyncTime
    ),
    watermark: {
      column: config.incrementalColumn,
      lastValue: lastSync?.lastValue
    }
  };
} else if (config.checksumColumns) {
  extractionStrategy = {
    type: 'change_detection',
    query: buildChangeDetectionQuery(
      config.table,
      config.primaryKey,
      config.checksumColumns
    ),
    previousChecksums: lastSync?.checksums || {}
  };
} else {
  extractionStrategy = {
    type: 'full',
    query: `SELECT * FROM ${config.table}`
  };
}
 
return {
  json: {
    strategy: extractionStrategy,
    syncWindow: {
      start: lastSyncTime.toISOString(),
      end: now.toISOString()
    },
    table: config.table,
    batchId: `sync_${config.table}_${Date.now()}`
  }
};

Data Transformation

Schema Transformation

// n8n Function Node - Schema Transformer
const sourceData = $input.all().map(i => i.json);
const schemaMapping = $('Schema Mapping Config').first().json;
 
// Schema transformation functions
const transformers = {
  // Type conversions
  toString: (value) => String(value ?? ''),
  toNumber: (value) => {
    const num = Number(value);
    return isNaN(num) ? null : num;
  },
  toBoolean: (value) => {
    if (typeof value === 'boolean') return value;
    if (typeof value === 'string') {
      return ['true', '1', 'yes', 'y'].includes(value.toLowerCase());
    }
    return Boolean(value);
  },
  toDate: (value, format) => {
    if (!value) return null;
    const date = new Date(value);
    return isNaN(date.getTime()) ? null : date.toISOString();
  },
  toArray: (value, delimiter = ',') => {
    if (Array.isArray(value)) return value;
    if (typeof value === 'string') return value.split(delimiter).map(s => s.trim());
    return [value];
  },
 
  // String transformations
  trim: (value) => String(value ?? '').trim(),
  uppercase: (value) => String(value ?? '').toUpperCase(),
  lowercase: (value) => String(value ?? '').toLowerCase(),
  capitalize: (value) => {
    const str = String(value ?? '');
    return str.charAt(0).toUpperCase() + str.slice(1).toLowerCase();
  },
 
  // Data cleaning
  removeHtml: (value) => String(value ?? '').replace(/<[^>]*>/g, ''),
  normalizeWhitespace: (value) => String(value ?? '').replace(/\s+/g, ' ').trim(),
  extractNumbers: (value) => String(value ?? '').replace(/[^\d.-]/g, ''),
 
  // Lookup transformations
  lookup: (value, lookupTable) => lookupTable[value] ?? value,
  default: (value, defaultValue) => value ?? defaultValue,
 
  // Concatenation
  concat: (values, separator = ' ') => values.filter(v => v != null).join(separator),
 
  // Custom expressions
  expression: (value, expr, context) => {
    // Safe expression evaluation
    try {
      const func = new Function('value', 'context', `return ${expr}`);
      return func(value, context);
    } catch (e) {
      return null;
    }
  }
};
 
// Apply schema mapping
const transformedData = sourceData.map(record => {
  const transformed = {};
 
  for (const [targetField, mapping] of Object.entries(schemaMapping.fields)) {
    let value;
 
    // Get source value(s)
    if (mapping.sourceFields) {
      // Multiple source fields
      value = mapping.sourceFields.map(sf => record[sf]);
    } else if (mapping.sourceField) {
      value = record[mapping.sourceField];
    } else if (mapping.constant) {
      value = mapping.constant;
    }
 
    // Apply transformations
    if (mapping.transformations) {
      for (const transform of mapping.transformations) {
        const transformFn = transformers[transform.type];
        if (transformFn) {
          value = Array.isArray(value) && transform.type !== 'concat'
            ? value.map(v => transformFn(v, transform.params, record))
            : transformFn(value, transform.params, record);
        }
      }
    }
 
    // Apply validation
    if (mapping.validation) {
      const isValid = validateField(value, mapping.validation);
      if (!isValid && mapping.validation.onInvalid === 'null') {
        value = null;
      } else if (!isValid && mapping.validation.onInvalid === 'skip') {
        continue;
      }
    }
 
    transformed[targetField] = value;
  }
 
  return transformed;
});
 
function validateField(value, rules) {
  if (rules.required && (value == null || value === '')) return false;
  if (rules.type === 'email' && value && !isValidEmail(value)) return false;
  if (rules.type === 'phone' && value && !isValidPhone(value)) return false;
  if (rules.minLength && String(value).length < rules.minLength) return false;
  if (rules.maxLength && String(value).length > rules.maxLength) return false;
  if (rules.pattern && !new RegExp(rules.pattern).test(value)) return false;
  return true;
}
 
function isValidEmail(email) {
  return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
}
 
function isValidPhone(phone) {
  return /^\+?[\d\s-()]{10,}$/.test(phone);
}
 
return transformedData.map(record => ({ json: record }));

Data Aggregation

// n8n Function Node - Data Aggregator
const data = $input.all().map(i => i.json);
const aggregationConfig = $('Aggregation Config').first().json;
 
// Aggregation functions
const aggregators = {
  sum: (values) => values.reduce((a, b) => a + (Number(b) || 0), 0),
  avg: (values) => {
    const nums = values.filter(v => !isNaN(Number(v))).map(Number);
    return nums.length ? nums.reduce((a, b) => a + b, 0) / nums.length : null;
  },
  min: (values) => Math.min(...values.filter(v => !isNaN(Number(v))).map(Number)),
  max: (values) => Math.max(...values.filter(v => !isNaN(Number(v))).map(Number)),
  count: (values) => values.length,
  countDistinct: (values) => new Set(values).size,
  first: (values) => values[0],
  last: (values) => values[values.length - 1],
  concat: (values, separator = ', ') => [...new Set(values)].join(separator),
  mode: (values) => {
    const freq = {};
    values.forEach(v => freq[v] = (freq[v] || 0) + 1);
    return Object.entries(freq).sort((a, b) => b[1] - a[1])[0]?.[0];
  },
  median: (values) => {
    const sorted = values.filter(v => !isNaN(Number(v))).map(Number).sort((a, b) => a - b);
    const mid = Math.floor(sorted.length / 2);
    return sorted.length % 2 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
  },
  stddev: (values) => {
    const nums = values.filter(v => !isNaN(Number(v))).map(Number);
    const avg = nums.reduce((a, b) => a + b, 0) / nums.length;
    const squareDiffs = nums.map(v => Math.pow(v - avg, 2));
    return Math.sqrt(squareDiffs.reduce((a, b) => a + b, 0) / nums.length);
  }
};
 
// Group data
const grouped = {};
for (const record of data) {
  // Build group key
  const groupKey = aggregationConfig.groupBy
    .map(field => record[field])
    .join('|');
 
  if (!grouped[groupKey]) {
    grouped[groupKey] = {
      _groupKey: groupKey,
      _groupValues: {},
      _records: []
    };
 
    // Store group-by values
    for (const field of aggregationConfig.groupBy) {
      grouped[groupKey]._groupValues[field] = record[field];
    }
  }
 
  grouped[groupKey]._records.push(record);
}
 
// Apply aggregations
const results = Object.values(grouped).map(group => {
  const result = { ...group._groupValues };
 
  for (const agg of aggregationConfig.aggregations) {
    const values = group._records.map(r => r[agg.field]);
    const aggregator = aggregators[agg.function];
 
    if (aggregator) {
      result[agg.alias || `${agg.function}_${agg.field}`] = aggregator(values, agg.params);
    }
  }
 
  // Add record count
  result._recordCount = group._records.length;
 
  return result;
});
 
// Apply having clause (post-aggregation filtering)
let filteredResults = results;
if (aggregationConfig.having) {
  filteredResults = results.filter(record => {
    return evaluateCondition(record, aggregationConfig.having);
  });
}
 
// Sort results
if (aggregationConfig.orderBy) {
  filteredResults.sort((a, b) => {
    for (const sort of aggregationConfig.orderBy) {
      const aVal = a[sort.field];
      const bVal = b[sort.field];
      const comparison = aVal < bVal ? -1 : aVal > bVal ? 1 : 0;
      if (comparison !== 0) {
        return sort.direction === 'desc' ? -comparison : comparison;
      }
    }
    return 0;
  });
}
 
function evaluateCondition(record, condition) {
  const value = record[condition.field];
  switch (condition.operator) {
    case '>': return value > condition.value;
    case '>=': return value >= condition.value;
    case '<': return value < condition.value;
    case '<=': return value <= condition.value;
    case '=': return value === condition.value;
    case '!=': return value !== condition.value;
    default: return true;
  }
}
 
return filteredResults.map(record => ({ json: record }));

Data Enrichment

// n8n Function Node - Data Enricher
const sourceData = $input.all().map(i => i.json);
const enrichmentConfig = $('Enrichment Config').first().json;
 
// Lookup data from other sources
const lookupTables = {
  customers: $('Customer Data').all().reduce((acc, item) => {
    acc[item.json.id] = item.json;
    return acc;
  }, {}),
  products: $('Product Data').all().reduce((acc, item) => {
    acc[item.json.sku] = item.json;
    return acc;
  }, {}),
  regions: $('Region Data').all().reduce((acc, item) => {
    acc[item.json.code] = item.json;
    return acc;
  }, {})
};
 
// Enrichment functions
const enrichers = {
  lookup: (record, config) => {
    const lookupTable = lookupTables[config.source];
    const lookupKey = record[config.keyField];
    const lookupRecord = lookupTable?.[lookupKey];
 
    if (lookupRecord && config.fields) {
      const enriched = {};
      for (const field of config.fields) {
        const targetName = field.alias || field.name;
        enriched[targetName] = lookupRecord[field.name];
      }
      return enriched;
    }
    return {};
  },
 
  calculate: (record, config) => {
    const result = {};
 
    for (const calc of config.calculations) {
      switch (calc.type) {
        case 'multiply':
          result[calc.target] = (record[calc.fields[0]] || 0) * (record[calc.fields[1]] || 0);
          break;
        case 'divide':
          const divisor = record[calc.fields[1]] || 1;
          result[calc.target] = divisor !== 0 ? (record[calc.fields[0]] || 0) / divisor : null;
          break;
        case 'add':
          result[calc.target] = calc.fields.reduce((sum, f) => sum + (record[f] || 0), 0);
          break;
        case 'subtract':
          result[calc.target] = (record[calc.fields[0]] || 0) - (record[calc.fields[1]] || 0);
          break;
        case 'percentage':
          const total = record[calc.fields[1]] || 1;
          result[calc.target] = total !== 0 ? ((record[calc.fields[0]] || 0) / total * 100).toFixed(2) : null;
          break;
        case 'datediff':
          const date1 = new Date(record[calc.fields[0]]);
          const date2 = new Date(record[calc.fields[1]]);
          const diffMs = date2 - date1;
          result[calc.target] = Math.floor(diffMs / (1000 * 60 * 60 * 24)); // days
          break;
      }
    }
 
    return result;
  },
 
  derive: (record, config) => {
    const result = {};
 
    for (const derivation of config.derivations) {
      const value = record[derivation.sourceField];
 
      switch (derivation.type) {
        case 'categorize':
          result[derivation.target] = categorize(value, derivation.ranges);
          break;
        case 'extract':
          if (derivation.pattern) {
            const match = String(value).match(new RegExp(derivation.pattern));
            result[derivation.target] = match ? match[derivation.group || 0] : null;
          }
          break;
        case 'datepart':
          const date = new Date(value);
          result[derivation.target] = extractDatePart(date, derivation.part);
          break;
        case 'conditional':
          result[derivation.target] = evaluateConditional(value, derivation.conditions, record);
          break;
      }
    }
 
    return result;
  },
 
  geocode: async (record, config) => {
    // Would call geocoding API - simplified for example
    if (record[config.addressField]) {
      return {
        latitude: null,
        longitude: null,
        formatted_address: record[config.addressField]
      };
    }
    return {};
  }
};
 
function categorize(value, ranges) {
  for (const range of ranges) {
    if (range.min !== undefined && value < range.min) continue;
    if (range.max !== undefined && value > range.max) continue;
    return range.label;
  }
  return 'Other';
}
 
function extractDatePart(date, part) {
  if (isNaN(date.getTime())) return null;
  switch (part) {
    case 'year': return date.getFullYear();
    case 'month': return date.getMonth() + 1;
    case 'day': return date.getDate();
    case 'quarter': return Math.ceil((date.getMonth() + 1) / 3);
    case 'weekday': return date.getDay();
    case 'week': return getWeekNumber(date);
    default: return null;
  }
}
 
function getWeekNumber(date) {
  const d = new Date(Date.UTC(date.getFullYear(), date.getMonth(), date.getDate()));
  const dayNum = d.getUTCDay() || 7;
  d.setUTCDate(d.getUTCDate() + 4 - dayNum);
  const yearStart = new Date(Date.UTC(d.getUTCFullYear(), 0, 1));
  return Math.ceil((((d - yearStart) / 86400000) + 1) / 7);
}
 
function evaluateConditional(value, conditions, record) {
  for (const condition of conditions) {
    let matches = true;
    for (const check of condition.checks) {
      const checkValue = check.field ? record[check.field] : value;
      if (!evaluateCheck(checkValue, check.operator, check.value)) {
        matches = false;
        break;
      }
    }
    if (matches) return condition.result;
  }
  return conditions.find(c => c.default)?.result || null;
}
 
function evaluateCheck(value, operator, compareValue) {
  switch (operator) {
    case '=': return value === compareValue;
    case '!=': return value !== compareValue;
    case '>': return value > compareValue;
    case '>=': return value >= compareValue;
    case '<': return value < compareValue;
    case '<=': return value <= compareValue;
    case 'contains': return String(value).includes(compareValue);
    case 'startsWith': return String(value).startsWith(compareValue);
    case 'in': return compareValue.includes(value);
    default: return false;
  }
}
 
// Apply enrichments
const enrichedData = sourceData.map(record => {
  let enriched = { ...record };
 
  for (const enrichment of enrichmentConfig.enrichments) {
    const enricher = enrichers[enrichment.type];
    if (enricher) {
      const additionalData = enricher(enriched, enrichment.config);
      enriched = { ...enriched, ...additionalData };
    }
  }
 
  return enriched;
});
 
return enrichedData.map(record => ({ json: record }));

Data Validation

// n8n Function Node - Data Validator
const data = $input.all().map(i => i.json);
const validationRules = $('Validation Rules').first().json;
 
const validators = {
  required: (value) => value != null && value !== '',
  type: (value, expectedType) => {
    switch (expectedType) {
      case 'string': return typeof value === 'string';
      case 'number': return typeof value === 'number' && !isNaN(value);
      case 'boolean': return typeof value === 'boolean';
      case 'date': return !isNaN(new Date(value).getTime());
      case 'array': return Array.isArray(value);
      case 'object': return typeof value === 'object' && !Array.isArray(value);
      default: return true;
    }
  },
  minLength: (value, min) => String(value ?? '').length >= min,
  maxLength: (value, max) => String(value ?? '').length <= max,
  min: (value, min) => Number(value) >= min,
  max: (value, max) => Number(value) <= max,
  pattern: (value, pattern) => new RegExp(pattern).test(String(value ?? '')),
  email: (value) => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(String(value ?? '')),
  url: (value) => {
    try { new URL(value); return true; } catch { return false; }
  },
  phone: (value) => /^\+?[\d\s-()]{10,}$/.test(String(value ?? '')),
  in: (value, allowedValues) => allowedValues.includes(value),
  notIn: (value, disallowedValues) => !disallowedValues.includes(value),
  unique: (value, field, allRecords) => {
    const values = allRecords.map(r => r[field]);
    return values.filter(v => v === value).length === 1;
  },
  custom: (value, expression, record) => {
    try {
      const func = new Function('value', 'record', `return ${expression}`);
      return func(value, record);
    } catch {
      return false;
    }
  }
};
 
// Validate all records
const results = {
  valid: [],
  invalid: [],
  summary: {
    total: data.length,
    valid: 0,
    invalid: 0,
    errorsByField: {},
    errorsByType: {}
  }
};
 
for (let i = 0; i < data.length; i++) {
  const record = data[i];
  const errors = [];
 
  // Apply field-level rules
  for (const [field, rules] of Object.entries(validationRules.fields || {})) {
    const value = record[field];
 
    for (const rule of rules) {
      const validator = validators[rule.type];
      if (!validator) continue;
 
      let isValid;
      if (rule.type === 'unique') {
        isValid = validator(value, field, data);
      } else if (rule.type === 'custom') {
        isValid = validator(value, rule.expression, record);
      } else {
        isValid = validator(value, rule.value);
      }
 
      if (!isValid) {
        const error = {
          field,
          rule: rule.type,
          value,
          message: rule.message || `Validation failed: ${rule.type}`,
          severity: rule.severity || 'error'
        };
        errors.push(error);
 
        // Track error statistics
        results.summary.errorsByField[field] = (results.summary.errorsByField[field] || 0) + 1;
        results.summary.errorsByType[rule.type] = (results.summary.errorsByType[rule.type] || 0) + 1;
      }
    }
  }
 
  // Apply record-level rules
  for (const rule of validationRules.records || []) {
    if (rule.type === 'custom') {
      const isValid = validators.custom(null, rule.expression, record);
      if (!isValid) {
        errors.push({
          field: '_record',
          rule: 'custom',
          message: rule.message || 'Record validation failed',
          severity: rule.severity || 'error'
        });
      }
    }
  }
 
  // Categorize record
  const hasErrors = errors.some(e => e.severity === 'error');
  if (hasErrors) {
    results.invalid.push({
      recordIndex: i,
      record,
      errors
    });
    results.summary.invalid++;
  } else {
    results.valid.push({
      recordIndex: i,
      record,
      warnings: errors.filter(e => e.severity === 'warning')
    });
    results.summary.valid++;
  }
}
 
// Output valid and invalid separately
return [
  { json: { type: 'validation_results', ...results.summary } },
  ...results.valid.map(v => ({ json: { ...v.record, _validation: 'valid', _warnings: v.warnings } })),
  ...results.invalid.map(v => ({ json: { ...v.record, _validation: 'invalid', _errors: v.errors } }))
];

Data Loading

Batch Loading with Error Handling

// n8n Function Node - Batch Loader
const data = $input.all().map(i => i.json);
const loadConfig = $('Load Config').first().json;
 
// Batch configuration
const batchSize = loadConfig.batchSize || 100;
const maxRetries = loadConfig.maxRetries || 3;
const retryDelay = loadConfig.retryDelay || 1000;
 
// Split into batches
const batches = [];
for (let i = 0; i < data.length; i += batchSize) {
  batches.push({
    batchNumber: Math.floor(i / batchSize) + 1,
    records: data.slice(i, i + batchSize),
    retryCount: 0
  });
}
 
// Load strategy
const loadStrategy = {
  insert: {
    operation: 'insert',
    onConflict: loadConfig.onConflict || 'error' // error, skip, update
  },
  upsert: {
    operation: 'upsert',
    keyFields: loadConfig.keyFields || ['id'],
    updateFields: loadConfig.updateFields || null // null = all fields
  },
  update: {
    operation: 'update',
    keyFields: loadConfig.keyFields || ['id'],
    updateFields: loadConfig.updateFields
  },
  delete: {
    operation: 'delete',
    keyFields: loadConfig.keyFields || ['id']
  }
};
 
// Generate load operations
const loadOperations = batches.map(batch => ({
  batchId: `batch_${batch.batchNumber}_${Date.now()}`,
  batchNumber: batch.batchNumber,
  totalBatches: batches.length,
  recordCount: batch.records.length,
  records: batch.records,
  strategy: loadStrategy[loadConfig.strategy] || loadStrategy.insert,
  destination: {
    type: loadConfig.destinationType, // database, api, file
    connection: loadConfig.connection,
    table: loadConfig.table,
    schema: loadConfig.schema
  },
  retryConfig: {
    maxRetries,
    retryDelay,
    currentRetry: 0
  }
}));
 
return loadOperations.map(op => ({ json: op }));

Load Result Handler

// n8n Function Node - Load Result Handler
const loadResult = $input.first().json;
const batchInfo = $('Batch Info').first().json;
 
// Process load result
const result = {
  batchId: batchInfo.batchId,
  batchNumber: batchInfo.batchNumber,
  status: loadResult.success ? 'completed' : 'failed',
  recordsProcessed: loadResult.rowsAffected || 0,
  recordsExpected: batchInfo.recordCount,
  errors: loadResult.errors || [],
  startTime: batchInfo.startTime,
  endTime: new Date().toISOString(),
  duration: null
};
 
// Calculate duration
if (batchInfo.startTime) {
  const start = new Date(batchInfo.startTime);
  const end = new Date(result.endTime);
  result.duration = end - start;
}
 
// Determine if retry needed
const shouldRetry = !loadResult.success &&
  batchInfo.retryConfig.currentRetry < batchInfo.retryConfig.maxRetries;
 
if (shouldRetry) {
  result.status = 'retrying';
  result.nextRetry = {
    attemptNumber: batchInfo.retryConfig.currentRetry + 1,
    delay: batchInfo.retryConfig.retryDelay * Math.pow(2, batchInfo.retryConfig.currentRetry)
  };
}
 
// Log metrics
const metrics = {
  batchId: result.batchId,
  recordsPerSecond: result.duration > 0
    ? (result.recordsProcessed / (result.duration / 1000)).toFixed(2)
    : 0,
  errorRate: batchInfo.recordCount > 0
    ? ((result.errors.length / batchInfo.recordCount) * 100).toFixed(2)
    : 0
};
 
return {
  json: {
    ...result,
    metrics,
    shouldRetry,
    isLastBatch: batchInfo.batchNumber === batchInfo.totalBatches
  }
};

Pipeline Orchestration

// n8n Function Node - Pipeline Orchestrator
const pipelineConfig = $input.first().json;
const previousStepResult = $('Previous Step').first()?.json;
 
// Pipeline state management
const pipelineState = {
  pipelineId: pipelineConfig.pipelineId || `pipeline_${Date.now()}`,
  currentStep: pipelineConfig.currentStep || 0,
  totalSteps: pipelineConfig.steps.length,
  status: 'running',
  startTime: pipelineConfig.startTime || new Date().toISOString(),
  stepResults: pipelineConfig.stepResults || [],
  metrics: {
    recordsExtracted: 0,
    recordsTransformed: 0,
    recordsLoaded: 0,
    recordsFailed: 0
  }
};
 
// Update state with previous step result
if (previousStepResult) {
  pipelineState.stepResults.push({
    step: pipelineState.currentStep - 1,
    name: pipelineConfig.steps[pipelineState.currentStep - 1]?.name,
    status: previousStepResult.success ? 'completed' : 'failed',
    recordsProcessed: previousStepResult.recordCount || 0,
    errors: previousStepResult.errors || [],
    duration: previousStepResult.duration
  });
 
  // Update metrics
  const stepType = pipelineConfig.steps[pipelineState.currentStep - 1]?.type;
  if (stepType === 'extract') {
    pipelineState.metrics.recordsExtracted = previousStepResult.recordCount || 0;
  } else if (stepType === 'transform') {
    pipelineState.metrics.recordsTransformed = previousStepResult.recordCount || 0;
  } else if (stepType === 'load') {
    pipelineState.metrics.recordsLoaded = previousStepResult.recordCount || 0;
    pipelineState.metrics.recordsFailed = previousStepResult.errors?.length || 0;
  }
}
 
// Determine next action
const currentStepConfig = pipelineConfig.steps[pipelineState.currentStep];
 
if (!currentStepConfig) {
  // Pipeline complete
  pipelineState.status = 'completed';
  pipelineState.endTime = new Date().toISOString();
 
  return {
    json: {
      action: 'complete',
      pipelineState,
      summary: generatePipelineSummary(pipelineState)
    }
  };
}
 
// Check for failure conditions
const failedSteps = pipelineState.stepResults.filter(s => s.status === 'failed');
if (failedSteps.length > 0 && pipelineConfig.stopOnError) {
  pipelineState.status = 'failed';
  pipelineState.endTime = new Date().toISOString();
 
  return {
    json: {
      action: 'fail',
      pipelineState,
      failedSteps
    }
  };
}
 
function generatePipelineSummary(state) {
  const duration = new Date(state.endTime) - new Date(state.startTime);
  return {
    pipelineId: state.pipelineId,
    status: state.status,
    duration: `${(duration / 1000).toFixed(2)}s`,
    stepsCompleted: state.stepResults.filter(s => s.status === 'completed').length,
    stepsFailed: state.stepResults.filter(s => s.status === 'failed').length,
    metrics: state.metrics,
    throughput: state.metrics.recordsLoaded > 0 && duration > 0
      ? `${(state.metrics.recordsLoaded / (duration / 1000)).toFixed(2)} records/sec`
      : 'N/A'
  };
}
 
// Prepare next step
return {
  json: {
    action: 'execute_step',
    step: currentStepConfig,
    stepNumber: pipelineState.currentStep + 1,
    pipelineState: {
      ...pipelineState,
      currentStep: pipelineState.currentStep + 1
    }
  }
};

Best Practices

ETL Design

  1. Idempotency: Design pipelines to be safely re-runnable
  2. Incremental loading: Process only changed data when possible
  3. Data validation: Validate at extraction and before loading
  4. Error handling: Implement retry logic and dead-letter queues

Performance Optimization

  • Use batch processing for large datasets
  • Implement parallel extraction where sources allow
  • Cache lookup data to reduce API calls
  • Monitor pipeline metrics for bottlenecks

Data Quality

  • Implement comprehensive validation rules
  • Log data quality metrics
  • Create alerts for quality threshold breaches
  • Maintain data lineage for debugging

Building ETL workflows with n8n enables flexible, maintainable data pipelines that can scale with your data processing needs.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.