n8n Automation

n8n Data Transformation and ETL Workflows: Building Data Pipelines

DeviDevs Team
18 min read
#n8n#ETL#data transformation#data pipelines#automation

n8n Data Transformation and ETL Workflows: Building Data Pipelines

Data transformation and ETL (Extract, Transform, Load) are critical for data-driven operations. This guide shows how to build comprehensive data pipelines with n8n.

Data Extraction Patterns

Multi-Source Data Extraction

// n8n Function Node - Data Extraction Controller
const extractionConfig = $input.first().json.config;
 
// Define extraction sources
const sources = {
  database: async (config) => {
    // Extracted via PostgreSQL/MySQL node
    return {
      type: 'database',
      query: config.query,
      connection: config.connectionName
    };
  },
 
  api: async (config) => {
    // Extracted via HTTP Request node
    return {
      type: 'api',
      endpoint: config.endpoint,
      method: config.method || 'GET',
      headers: config.headers || {},
      pagination: config.pagination
    };
  },
 
  file: async (config) => {
    // Extracted via Read Binary File / FTP node
    return {
      type: 'file',
      path: config.path,
      format: config.format, // csv, json, xlsx
      encoding: config.encoding || 'utf-8'
    };
  },
 
  spreadsheet: async (config) => {
    // Extracted via Google Sheets / Excel node
    return {
      type: 'spreadsheet',
      sheetId: config.sheetId,
      range: config.range,
      hasHeaders: config.hasHeaders !== false
    };
  }
};
 
// Generate extraction tasks based on config
const extractionTasks = extractionConfig.sources.map(source => ({
  sourceId: source.id,
  sourceType: source.type,
  config: sources[source.type](source.config),
  priority: source.priority || 5,
  retryPolicy: source.retryPolicy || { maxRetries: 3, backoff: 'exponential' }
}));
 
// Sort by priority
extractionTasks.sort((a, b) => b.priority - a.priority);
 
return extractionTasks.map(task => ({ json: task }));

Paginated API Extraction

// n8n Function Node - Paginated API Extractor
const apiConfig = $input.first().json;
const existingData = $('Previous Page Data').all().map(i => i.json);
 
// Pagination state
const pagination = {
  currentPage: apiConfig.currentPage || 1,
  pageSize: apiConfig.pageSize || 100,
  totalPages: apiConfig.totalPages || null,
  cursor: apiConfig.cursor || null,
  hasMore: true
};
 
// Determine pagination type
const paginationType = apiConfig.paginationType || 'offset';
 
// Build request parameters
let requestParams = {};
 
switch (paginationType) {
  case 'offset':
    requestParams = {
      offset: (pagination.currentPage - 1) * pagination.pageSize,
      limit: pagination.pageSize
    };
    break;
 
  case 'page':
    requestParams = {
      page: pagination.currentPage,
      per_page: pagination.pageSize
    };
    break;
 
  case 'cursor':
    requestParams = pagination.cursor
      ? { cursor: pagination.cursor, limit: pagination.pageSize }
      : { limit: pagination.pageSize };
    break;
 
  case 'link':
    // For APIs that return next page URL in headers/body
    requestParams = {
      nextUrl: apiConfig.nextUrl || apiConfig.baseUrl
    };
    break;
}
 
// Merge with any custom parameters
requestParams = {
  ...requestParams,
  ...apiConfig.additionalParams
};
 
return {
  json: {
    requestConfig: {
      url: apiConfig.baseUrl,
      method: 'GET',
      params: requestParams,
      headers: apiConfig.headers
    },
    pagination,
    paginationType,
    extractedCount: existingData.length,
    batchId: apiConfig.batchId || `batch_${Date.now()}`
  }
};

Incremental Extraction

// n8n Function Node - Incremental Extraction Manager
const config = $input.first().json;
const lastSync = $('Get Last Sync State').first().json;
 
// Determine extraction window
const now = new Date();
const lastSyncTime = lastSync?.lastSyncTime
  ? new Date(lastSync.lastSyncTime)
  : new Date(0); // Full extraction if no previous sync
 
// Build incremental query
const buildIncrementalQuery = (table, timestampColumn, lastSync) => {
  const whereClause = lastSync
    ? `WHERE ${timestampColumn} > '${lastSync.toISOString()}'`
    : '';
 
  return `
    SELECT *
    FROM ${table}
    ${whereClause}
    ORDER BY ${timestampColumn} ASC
    LIMIT 10000
  `;
};
 
// Build change detection query (for tables without timestamps)
const buildChangeDetectionQuery = (table, primaryKey, checksumColumns) => {
  return `
    SELECT
      ${primaryKey},
      MD5(CONCAT(${checksumColumns.join(', ')})) as row_checksum
    FROM ${table}
  `;
};
 
// Extraction strategy
let extractionStrategy;
if (config.incrementalColumn) {
  extractionStrategy = {
    type: 'incremental',
    query: buildIncrementalQuery(
      config.table,
      config.incrementalColumn,
      lastSyncTime
    ),
    watermark: {
      column: config.incrementalColumn,
      lastValue: lastSync?.lastValue
    }
  };
} else if (config.checksumColumns) {
  extractionStrategy = {
    type: 'change_detection',
    query: buildChangeDetectionQuery(
      config.table,
      config.primaryKey,
      config.checksumColumns
    ),
    previousChecksums: lastSync?.checksums || {}
  };
} else {
  extractionStrategy = {
    type: 'full',
    query: `SELECT * FROM ${config.table}`
  };
}
 
return {
  json: {
    strategy: extractionStrategy,
    syncWindow: {
      start: lastSyncTime.toISOString(),
      end: now.toISOString()
    },
    table: config.table,
    batchId: `sync_${config.table}_${Date.now()}`
  }
};

Data Transformation

Schema Transformation

// n8n Function Node - Schema Transformer
const sourceData = $input.all().map(i => i.json);
const schemaMapping = $('Schema Mapping Config').first().json;
 
// Schema transformation functions
const transformers = {
  // Type conversions
  toString: (value) => String(value ?? ''),
  toNumber: (value) => {
    const num = Number(value);
    return isNaN(num) ? null : num;
  },
  toBoolean: (value) => {
    if (typeof value === 'boolean') return value;
    if (typeof value === 'string') {
      return ['true', '1', 'yes', 'y'].includes(value.toLowerCase());
    }
    return Boolean(value);
  },
  toDate: (value, format) => {
    if (!value) return null;
    const date = new Date(value);
    return isNaN(date.getTime()) ? null : date.toISOString();
  },
  toArray: (value, delimiter = ',') => {
    if (Array.isArray(value)) return value;
    if (typeof value === 'string') return value.split(delimiter).map(s => s.trim());
    return [value];
  },
 
  // String transformations
  trim: (value) => String(value ?? '').trim(),
  uppercase: (value) => String(value ?? '').toUpperCase(),
  lowercase: (value) => String(value ?? '').toLowerCase(),
  capitalize: (value) => {
    const str = String(value ?? '');
    return str.charAt(0).toUpperCase() + str.slice(1).toLowerCase();
  },
 
  // Data cleaning
  removeHtml: (value) => String(value ?? '').replace(/<[^>]*>/g, ''),
  normalizeWhitespace: (value) => String(value ?? '').replace(/\s+/g, ' ').trim(),
  extractNumbers: (value) => String(value ?? '').replace(/[^\d.-]/g, ''),
 
  // Lookup transformations
  lookup: (value, lookupTable) => lookupTable[value] ?? value,
  default: (value, defaultValue) => value ?? defaultValue,
 
  // Concatenation
  concat: (values, separator = ' ') => values.filter(v => v != null).join(separator),
 
  // Custom expressions
  expression: (value, expr, context) => {
    // Safe expression evaluation
    try {
      const func = new Function('value', 'context', `return ${expr}`);
      return func(value, context);
    } catch (e) {
      return null;
    }
  }
};
 
// Apply schema mapping
const transformedData = sourceData.map(record => {
  const transformed = {};
 
  for (const [targetField, mapping] of Object.entries(schemaMapping.fields)) {
    let value;
 
    // Get source value(s)
    if (mapping.sourceFields) {
      // Multiple source fields
      value = mapping.sourceFields.map(sf => record[sf]);
    } else if (mapping.sourceField) {
      value = record[mapping.sourceField];
    } else if (mapping.constant) {
      value = mapping.constant;
    }
 
    // Apply transformations
    if (mapping.transformations) {
      for (const transform of mapping.transformations) {
        const transformFn = transformers[transform.type];
        if (transformFn) {
          value = Array.isArray(value) && transform.type !== 'concat'
            ? value.map(v => transformFn(v, transform.params, record))
            : transformFn(value, transform.params, record);
        }
      }
    }
 
    // Apply validation
    if (mapping.validation) {
      const isValid = validateField(value, mapping.validation);
      if (!isValid && mapping.validation.onInvalid === 'null') {
        value = null;
      } else if (!isValid && mapping.validation.onInvalid === 'skip') {
        continue;
      }
    }
 
    transformed[targetField] = value;
  }
 
  return transformed;
});
 
function validateField(value, rules) {
  if (rules.required && (value == null || value === '')) return false;
  if (rules.type === 'email' && value && !isValidEmail(value)) return false;
  if (rules.type === 'phone' && value && !isValidPhone(value)) return false;
  if (rules.minLength && String(value).length < rules.minLength) return false;
  if (rules.maxLength && String(value).length > rules.maxLength) return false;
  if (rules.pattern && !new RegExp(rules.pattern).test(value)) return false;
  return true;
}
 
function isValidEmail(email) {
  return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
}
 
function isValidPhone(phone) {
  return /^\+?[\d\s-()]{10,}$/.test(phone);
}
 
return transformedData.map(record => ({ json: record }));

Data Aggregation

// n8n Function Node - Data Aggregator
const data = $input.all().map(i => i.json);
const aggregationConfig = $('Aggregation Config').first().json;
 
// Aggregation functions
const aggregators = {
  sum: (values) => values.reduce((a, b) => a + (Number(b) || 0), 0),
  avg: (values) => {
    const nums = values.filter(v => !isNaN(Number(v))).map(Number);
    return nums.length ? nums.reduce((a, b) => a + b, 0) / nums.length : null;
  },
  min: (values) => Math.min(...values.filter(v => !isNaN(Number(v))).map(Number)),
  max: (values) => Math.max(...values.filter(v => !isNaN(Number(v))).map(Number)),
  count: (values) => values.length,
  countDistinct: (values) => new Set(values).size,
  first: (values) => values[0],
  last: (values) => values[values.length - 1],
  concat: (values, separator = ', ') => [...new Set(values)].join(separator),
  mode: (values) => {
    const freq = {};
    values.forEach(v => freq[v] = (freq[v] || 0) + 1);
    return Object.entries(freq).sort((a, b) => b[1] - a[1])[0]?.[0];
  },
  median: (values) => {
    const sorted = values.filter(v => !isNaN(Number(v))).map(Number).sort((a, b) => a - b);
    const mid = Math.floor(sorted.length / 2);
    return sorted.length % 2 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
  },
  stddev: (values) => {
    const nums = values.filter(v => !isNaN(Number(v))).map(Number);
    const avg = nums.reduce((a, b) => a + b, 0) / nums.length;
    const squareDiffs = nums.map(v => Math.pow(v - avg, 2));
    return Math.sqrt(squareDiffs.reduce((a, b) => a + b, 0) / nums.length);
  }
};
 
// Group data
const grouped = {};
for (const record of data) {
  // Build group key
  const groupKey = aggregationConfig.groupBy
    .map(field => record[field])
    .join('|');
 
  if (!grouped[groupKey]) {
    grouped[groupKey] = {
      _groupKey: groupKey,
      _groupValues: {},
      _records: []
    };
 
    // Store group-by values
    for (const field of aggregationConfig.groupBy) {
      grouped[groupKey]._groupValues[field] = record[field];
    }
  }
 
  grouped[groupKey]._records.push(record);
}
 
// Apply aggregations
const results = Object.values(grouped).map(group => {
  const result = { ...group._groupValues };
 
  for (const agg of aggregationConfig.aggregations) {
    const values = group._records.map(r => r[agg.field]);
    const aggregator = aggregators[agg.function];
 
    if (aggregator) {
      result[agg.alias || `${agg.function}_${agg.field}`] = aggregator(values, agg.params);
    }
  }
 
  // Add record count
  result._recordCount = group._records.length;
 
  return result;
});
 
// Apply having clause (post-aggregation filtering)
let filteredResults = results;
if (aggregationConfig.having) {
  filteredResults = results.filter(record => {
    return evaluateCondition(record, aggregationConfig.having);
  });
}
 
// Sort results
if (aggregationConfig.orderBy) {
  filteredResults.sort((a, b) => {
    for (const sort of aggregationConfig.orderBy) {
      const aVal = a[sort.field];
      const bVal = b[sort.field];
      const comparison = aVal < bVal ? -1 : aVal > bVal ? 1 : 0;
      if (comparison !== 0) {
        return sort.direction === 'desc' ? -comparison : comparison;
      }
    }
    return 0;
  });
}
 
function evaluateCondition(record, condition) {
  const value = record[condition.field];
  switch (condition.operator) {
    case '>': return value > condition.value;
    case '>=': return value >= condition.value;
    case '<': return value < condition.value;
    case '<=': return value <= condition.value;
    case '=': return value === condition.value;
    case '!=': return value !== condition.value;
    default: return true;
  }
}
 
return filteredResults.map(record => ({ json: record }));

Data Enrichment

// n8n Function Node - Data Enricher
const sourceData = $input.all().map(i => i.json);
const enrichmentConfig = $('Enrichment Config').first().json;
 
// Lookup data from other sources
const lookupTables = {
  customers: $('Customer Data').all().reduce((acc, item) => {
    acc[item.json.id] = item.json;
    return acc;
  }, {}),
  products: $('Product Data').all().reduce((acc, item) => {
    acc[item.json.sku] = item.json;
    return acc;
  }, {}),
  regions: $('Region Data').all().reduce((acc, item) => {
    acc[item.json.code] = item.json;
    return acc;
  }, {})
};
 
// Enrichment functions
const enrichers = {
  lookup: (record, config) => {
    const lookupTable = lookupTables[config.source];
    const lookupKey = record[config.keyField];
    const lookupRecord = lookupTable?.[lookupKey];
 
    if (lookupRecord && config.fields) {
      const enriched = {};
      for (const field of config.fields) {
        const targetName = field.alias || field.name;
        enriched[targetName] = lookupRecord[field.name];
      }
      return enriched;
    }
    return {};
  },
 
  calculate: (record, config) => {
    const result = {};
 
    for (const calc of config.calculations) {
      switch (calc.type) {
        case 'multiply':
          result[calc.target] = (record[calc.fields[0]] || 0) * (record[calc.fields[1]] || 0);
          break;
        case 'divide':
          const divisor = record[calc.fields[1]] || 1;
          result[calc.target] = divisor !== 0 ? (record[calc.fields[0]] || 0) / divisor : null;
          break;
        case 'add':
          result[calc.target] = calc.fields.reduce((sum, f) => sum + (record[f] || 0), 0);
          break;
        case 'subtract':
          result[calc.target] = (record[calc.fields[0]] || 0) - (record[calc.fields[1]] || 0);
          break;
        case 'percentage':
          const total = record[calc.fields[1]] || 1;
          result[calc.target] = total !== 0 ? ((record[calc.fields[0]] || 0) / total * 100).toFixed(2) : null;
          break;
        case 'datediff':
          const date1 = new Date(record[calc.fields[0]]);
          const date2 = new Date(record[calc.fields[1]]);
          const diffMs = date2 - date1;
          result[calc.target] = Math.floor(diffMs / (1000 * 60 * 60 * 24)); // days
          break;
      }
    }
 
    return result;
  },
 
  derive: (record, config) => {
    const result = {};
 
    for (const derivation of config.derivations) {
      const value = record[derivation.sourceField];
 
      switch (derivation.type) {
        case 'categorize':
          result[derivation.target] = categorize(value, derivation.ranges);
          break;
        case 'extract':
          if (derivation.pattern) {
            const match = String(value).match(new RegExp(derivation.pattern));
            result[derivation.target] = match ? match[derivation.group || 0] : null;
          }
          break;
        case 'datepart':
          const date = new Date(value);
          result[derivation.target] = extractDatePart(date, derivation.part);
          break;
        case 'conditional':
          result[derivation.target] = evaluateConditional(value, derivation.conditions, record);
          break;
      }
    }
 
    return result;
  },
 
  geocode: async (record, config) => {
    // Would call geocoding API - simplified for example
    if (record[config.addressField]) {
      return {
        latitude: null,
        longitude: null,
        formatted_address: record[config.addressField]
      };
    }
    return {};
  }
};
 
function categorize(value, ranges) {
  for (const range of ranges) {
    if (range.min !== undefined && value < range.min) continue;
    if (range.max !== undefined && value > range.max) continue;
    return range.label;
  }
  return 'Other';
}
 
function extractDatePart(date, part) {
  if (isNaN(date.getTime())) return null;
  switch (part) {
    case 'year': return date.getFullYear();
    case 'month': return date.getMonth() + 1;
    case 'day': return date.getDate();
    case 'quarter': return Math.ceil((date.getMonth() + 1) / 3);
    case 'weekday': return date.getDay();
    case 'week': return getWeekNumber(date);
    default: return null;
  }
}
 
function getWeekNumber(date) {
  const d = new Date(Date.UTC(date.getFullYear(), date.getMonth(), date.getDate()));
  const dayNum = d.getUTCDay() || 7;
  d.setUTCDate(d.getUTCDate() + 4 - dayNum);
  const yearStart = new Date(Date.UTC(d.getUTCFullYear(), 0, 1));
  return Math.ceil((((d - yearStart) / 86400000) + 1) / 7);
}
 
function evaluateConditional(value, conditions, record) {
  for (const condition of conditions) {
    let matches = true;
    for (const check of condition.checks) {
      const checkValue = check.field ? record[check.field] : value;
      if (!evaluateCheck(checkValue, check.operator, check.value)) {
        matches = false;
        break;
      }
    }
    if (matches) return condition.result;
  }
  return conditions.find(c => c.default)?.result || null;
}
 
function evaluateCheck(value, operator, compareValue) {
  switch (operator) {
    case '=': return value === compareValue;
    case '!=': return value !== compareValue;
    case '>': return value > compareValue;
    case '>=': return value >= compareValue;
    case '<': return value < compareValue;
    case '<=': return value <= compareValue;
    case 'contains': return String(value).includes(compareValue);
    case 'startsWith': return String(value).startsWith(compareValue);
    case 'in': return compareValue.includes(value);
    default: return false;
  }
}
 
// Apply enrichments
const enrichedData = sourceData.map(record => {
  let enriched = { ...record };
 
  for (const enrichment of enrichmentConfig.enrichments) {
    const enricher = enrichers[enrichment.type];
    if (enricher) {
      const additionalData = enricher(enriched, enrichment.config);
      enriched = { ...enriched, ...additionalData };
    }
  }
 
  return enriched;
});
 
return enrichedData.map(record => ({ json: record }));

Data Validation

// n8n Function Node - Data Validator
const data = $input.all().map(i => i.json);
const validationRules = $('Validation Rules').first().json;
 
const validators = {
  required: (value) => value != null && value !== '',
  type: (value, expectedType) => {
    switch (expectedType) {
      case 'string': return typeof value === 'string';
      case 'number': return typeof value === 'number' && !isNaN(value);
      case 'boolean': return typeof value === 'boolean';
      case 'date': return !isNaN(new Date(value).getTime());
      case 'array': return Array.isArray(value);
      case 'object': return typeof value === 'object' && !Array.isArray(value);
      default: return true;
    }
  },
  minLength: (value, min) => String(value ?? '').length >= min,
  maxLength: (value, max) => String(value ?? '').length <= max,
  min: (value, min) => Number(value) >= min,
  max: (value, max) => Number(value) <= max,
  pattern: (value, pattern) => new RegExp(pattern).test(String(value ?? '')),
  email: (value) => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(String(value ?? '')),
  url: (value) => {
    try { new URL(value); return true; } catch { return false; }
  },
  phone: (value) => /^\+?[\d\s-()]{10,}$/.test(String(value ?? '')),
  in: (value, allowedValues) => allowedValues.includes(value),
  notIn: (value, disallowedValues) => !disallowedValues.includes(value),
  unique: (value, field, allRecords) => {
    const values = allRecords.map(r => r[field]);
    return values.filter(v => v === value).length === 1;
  },
  custom: (value, expression, record) => {
    try {
      const func = new Function('value', 'record', `return ${expression}`);
      return func(value, record);
    } catch {
      return false;
    }
  }
};
 
// Validate all records
const results = {
  valid: [],
  invalid: [],
  summary: {
    total: data.length,
    valid: 0,
    invalid: 0,
    errorsByField: {},
    errorsByType: {}
  }
};
 
for (let i = 0; i < data.length; i++) {
  const record = data[i];
  const errors = [];
 
  // Apply field-level rules
  for (const [field, rules] of Object.entries(validationRules.fields || {})) {
    const value = record[field];
 
    for (const rule of rules) {
      const validator = validators[rule.type];
      if (!validator) continue;
 
      let isValid;
      if (rule.type === 'unique') {
        isValid = validator(value, field, data);
      } else if (rule.type === 'custom') {
        isValid = validator(value, rule.expression, record);
      } else {
        isValid = validator(value, rule.value);
      }
 
      if (!isValid) {
        const error = {
          field,
          rule: rule.type,
          value,
          message: rule.message || `Validation failed: ${rule.type}`,
          severity: rule.severity || 'error'
        };
        errors.push(error);
 
        // Track error statistics
        results.summary.errorsByField[field] = (results.summary.errorsByField[field] || 0) + 1;
        results.summary.errorsByType[rule.type] = (results.summary.errorsByType[rule.type] || 0) + 1;
      }
    }
  }
 
  // Apply record-level rules
  for (const rule of validationRules.records || []) {
    if (rule.type === 'custom') {
      const isValid = validators.custom(null, rule.expression, record);
      if (!isValid) {
        errors.push({
          field: '_record',
          rule: 'custom',
          message: rule.message || 'Record validation failed',
          severity: rule.severity || 'error'
        });
      }
    }
  }
 
  // Categorize record
  const hasErrors = errors.some(e => e.severity === 'error');
  if (hasErrors) {
    results.invalid.push({
      recordIndex: i,
      record,
      errors
    });
    results.summary.invalid++;
  } else {
    results.valid.push({
      recordIndex: i,
      record,
      warnings: errors.filter(e => e.severity === 'warning')
    });
    results.summary.valid++;
  }
}
 
// Output valid and invalid separately
return [
  { json: { type: 'validation_results', ...results.summary } },
  ...results.valid.map(v => ({ json: { ...v.record, _validation: 'valid', _warnings: v.warnings } })),
  ...results.invalid.map(v => ({ json: { ...v.record, _validation: 'invalid', _errors: v.errors } }))
];

Data Loading

Batch Loading with Error Handling

// n8n Function Node - Batch Loader
const data = $input.all().map(i => i.json);
const loadConfig = $('Load Config').first().json;
 
// Batch configuration
const batchSize = loadConfig.batchSize || 100;
const maxRetries = loadConfig.maxRetries || 3;
const retryDelay = loadConfig.retryDelay || 1000;
 
// Split into batches
const batches = [];
for (let i = 0; i < data.length; i += batchSize) {
  batches.push({
    batchNumber: Math.floor(i / batchSize) + 1,
    records: data.slice(i, i + batchSize),
    retryCount: 0
  });
}
 
// Load strategy
const loadStrategy = {
  insert: {
    operation: 'insert',
    onConflict: loadConfig.onConflict || 'error' // error, skip, update
  },
  upsert: {
    operation: 'upsert',
    keyFields: loadConfig.keyFields || ['id'],
    updateFields: loadConfig.updateFields || null // null = all fields
  },
  update: {
    operation: 'update',
    keyFields: loadConfig.keyFields || ['id'],
    updateFields: loadConfig.updateFields
  },
  delete: {
    operation: 'delete',
    keyFields: loadConfig.keyFields || ['id']
  }
};
 
// Generate load operations
const loadOperations = batches.map(batch => ({
  batchId: `batch_${batch.batchNumber}_${Date.now()}`,
  batchNumber: batch.batchNumber,
  totalBatches: batches.length,
  recordCount: batch.records.length,
  records: batch.records,
  strategy: loadStrategy[loadConfig.strategy] || loadStrategy.insert,
  destination: {
    type: loadConfig.destinationType, // database, api, file
    connection: loadConfig.connection,
    table: loadConfig.table,
    schema: loadConfig.schema
  },
  retryConfig: {
    maxRetries,
    retryDelay,
    currentRetry: 0
  }
}));
 
return loadOperations.map(op => ({ json: op }));

Load Result Handler

// n8n Function Node - Load Result Handler
const loadResult = $input.first().json;
const batchInfo = $('Batch Info').first().json;
 
// Process load result
const result = {
  batchId: batchInfo.batchId,
  batchNumber: batchInfo.batchNumber,
  status: loadResult.success ? 'completed' : 'failed',
  recordsProcessed: loadResult.rowsAffected || 0,
  recordsExpected: batchInfo.recordCount,
  errors: loadResult.errors || [],
  startTime: batchInfo.startTime,
  endTime: new Date().toISOString(),
  duration: null
};
 
// Calculate duration
if (batchInfo.startTime) {
  const start = new Date(batchInfo.startTime);
  const end = new Date(result.endTime);
  result.duration = end - start;
}
 
// Determine if retry needed
const shouldRetry = !loadResult.success &&
  batchInfo.retryConfig.currentRetry < batchInfo.retryConfig.maxRetries;
 
if (shouldRetry) {
  result.status = 'retrying';
  result.nextRetry = {
    attemptNumber: batchInfo.retryConfig.currentRetry + 1,
    delay: batchInfo.retryConfig.retryDelay * Math.pow(2, batchInfo.retryConfig.currentRetry)
  };
}
 
// Log metrics
const metrics = {
  batchId: result.batchId,
  recordsPerSecond: result.duration > 0
    ? (result.recordsProcessed / (result.duration / 1000)).toFixed(2)
    : 0,
  errorRate: batchInfo.recordCount > 0
    ? ((result.errors.length / batchInfo.recordCount) * 100).toFixed(2)
    : 0
};
 
return {
  json: {
    ...result,
    metrics,
    shouldRetry,
    isLastBatch: batchInfo.batchNumber === batchInfo.totalBatches
  }
};

Pipeline Orchestration

// n8n Function Node - Pipeline Orchestrator
const pipelineConfig = $input.first().json;
const previousStepResult = $('Previous Step').first()?.json;
 
// Pipeline state management
const pipelineState = {
  pipelineId: pipelineConfig.pipelineId || `pipeline_${Date.now()}`,
  currentStep: pipelineConfig.currentStep || 0,
  totalSteps: pipelineConfig.steps.length,
  status: 'running',
  startTime: pipelineConfig.startTime || new Date().toISOString(),
  stepResults: pipelineConfig.stepResults || [],
  metrics: {
    recordsExtracted: 0,
    recordsTransformed: 0,
    recordsLoaded: 0,
    recordsFailed: 0
  }
};
 
// Update state with previous step result
if (previousStepResult) {
  pipelineState.stepResults.push({
    step: pipelineState.currentStep - 1,
    name: pipelineConfig.steps[pipelineState.currentStep - 1]?.name,
    status: previousStepResult.success ? 'completed' : 'failed',
    recordsProcessed: previousStepResult.recordCount || 0,
    errors: previousStepResult.errors || [],
    duration: previousStepResult.duration
  });
 
  // Update metrics
  const stepType = pipelineConfig.steps[pipelineState.currentStep - 1]?.type;
  if (stepType === 'extract') {
    pipelineState.metrics.recordsExtracted = previousStepResult.recordCount || 0;
  } else if (stepType === 'transform') {
    pipelineState.metrics.recordsTransformed = previousStepResult.recordCount || 0;
  } else if (stepType === 'load') {
    pipelineState.metrics.recordsLoaded = previousStepResult.recordCount || 0;
    pipelineState.metrics.recordsFailed = previousStepResult.errors?.length || 0;
  }
}
 
// Determine next action
const currentStepConfig = pipelineConfig.steps[pipelineState.currentStep];
 
if (!currentStepConfig) {
  // Pipeline complete
  pipelineState.status = 'completed';
  pipelineState.endTime = new Date().toISOString();
 
  return {
    json: {
      action: 'complete',
      pipelineState,
      summary: generatePipelineSummary(pipelineState)
    }
  };
}
 
// Check for failure conditions
const failedSteps = pipelineState.stepResults.filter(s => s.status === 'failed');
if (failedSteps.length > 0 && pipelineConfig.stopOnError) {
  pipelineState.status = 'failed';
  pipelineState.endTime = new Date().toISOString();
 
  return {
    json: {
      action: 'fail',
      pipelineState,
      failedSteps
    }
  };
}
 
function generatePipelineSummary(state) {
  const duration = new Date(state.endTime) - new Date(state.startTime);
  return {
    pipelineId: state.pipelineId,
    status: state.status,
    duration: `${(duration / 1000).toFixed(2)}s`,
    stepsCompleted: state.stepResults.filter(s => s.status === 'completed').length,
    stepsFailed: state.stepResults.filter(s => s.status === 'failed').length,
    metrics: state.metrics,
    throughput: state.metrics.recordsLoaded > 0 && duration > 0
      ? `${(state.metrics.recordsLoaded / (duration / 1000)).toFixed(2)} records/sec`
      : 'N/A'
  };
}
 
// Prepare next step
return {
  json: {
    action: 'execute_step',
    step: currentStepConfig,
    stepNumber: pipelineState.currentStep + 1,
    pipelineState: {
      ...pipelineState,
      currentStep: pipelineState.currentStep + 1
    }
  }
};

Best Practices

ETL Design

  1. Idempotency: Design pipelines to be safely re-runnable
  2. Incremental loading: Process only changed data when possible
  3. Data validation: Validate at extraction and before loading
  4. Error handling: Implement retry logic and dead-letter queues

Performance Optimization

  • Use batch processing for large datasets
  • Implement parallel extraction where sources allow
  • Cache lookup data to reduce API calls
  • Monitor pipeline metrics for bottlenecks

Data Quality

  • Implement comprehensive validation rules
  • Log data quality metrics
  • Create alerts for quality threshold breaches
  • Maintain data lineage for debugging

Building ETL workflows with n8n enables flexible, maintainable data pipelines that can scale with your data processing needs.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.