n8n Data Transformation and ETL Workflows: Building Data Pipelines
Data transformation and ETL (Extract, Transform, Load) are critical for data-driven operations. This guide shows how to build comprehensive data pipelines with n8n.
Data Extraction Patterns
Multi-Source Data Extraction
// n8n Function Node - Data Extraction Controller
const extractionConfig = $input.first().json.config;
// Define extraction sources
const sources = {
database: async (config) => {
// Extracted via PostgreSQL/MySQL node
return {
type: 'database',
query: config.query,
connection: config.connectionName
};
},
api: async (config) => {
// Extracted via HTTP Request node
return {
type: 'api',
endpoint: config.endpoint,
method: config.method || 'GET',
headers: config.headers || {},
pagination: config.pagination
};
},
file: async (config) => {
// Extracted via Read Binary File / FTP node
return {
type: 'file',
path: config.path,
format: config.format, // csv, json, xlsx
encoding: config.encoding || 'utf-8'
};
},
spreadsheet: async (config) => {
// Extracted via Google Sheets / Excel node
return {
type: 'spreadsheet',
sheetId: config.sheetId,
range: config.range,
hasHeaders: config.hasHeaders !== false
};
}
};
// Generate extraction tasks based on config
const extractionTasks = extractionConfig.sources.map(source => ({
sourceId: source.id,
sourceType: source.type,
config: sources[source.type](source.config),
priority: source.priority || 5,
retryPolicy: source.retryPolicy || { maxRetries: 3, backoff: 'exponential' }
}));
// Sort by priority
extractionTasks.sort((a, b) => b.priority - a.priority);
return extractionTasks.map(task => ({ json: task }));Paginated API Extraction
// n8n Function Node - Paginated API Extractor
const apiConfig = $input.first().json;
const existingData = $('Previous Page Data').all().map(i => i.json);
// Pagination state
const pagination = {
currentPage: apiConfig.currentPage || 1,
pageSize: apiConfig.pageSize || 100,
totalPages: apiConfig.totalPages || null,
cursor: apiConfig.cursor || null,
hasMore: true
};
// Determine pagination type
const paginationType = apiConfig.paginationType || 'offset';
// Build request parameters
let requestParams = {};
switch (paginationType) {
case 'offset':
requestParams = {
offset: (pagination.currentPage - 1) * pagination.pageSize,
limit: pagination.pageSize
};
break;
case 'page':
requestParams = {
page: pagination.currentPage,
per_page: pagination.pageSize
};
break;
case 'cursor':
requestParams = pagination.cursor
? { cursor: pagination.cursor, limit: pagination.pageSize }
: { limit: pagination.pageSize };
break;
case 'link':
// For APIs that return next page URL in headers/body
requestParams = {
nextUrl: apiConfig.nextUrl || apiConfig.baseUrl
};
break;
}
// Merge with any custom parameters
requestParams = {
...requestParams,
...apiConfig.additionalParams
};
return {
json: {
requestConfig: {
url: apiConfig.baseUrl,
method: 'GET',
params: requestParams,
headers: apiConfig.headers
},
pagination,
paginationType,
extractedCount: existingData.length,
batchId: apiConfig.batchId || `batch_${Date.now()}`
}
};Incremental Extraction
// n8n Function Node - Incremental Extraction Manager
const config = $input.first().json;
const lastSync = $('Get Last Sync State').first().json;
// Determine extraction window
const now = new Date();
const lastSyncTime = lastSync?.lastSyncTime
? new Date(lastSync.lastSyncTime)
: new Date(0); // Full extraction if no previous sync
// Build incremental query
const buildIncrementalQuery = (table, timestampColumn, lastSync) => {
const whereClause = lastSync
? `WHERE ${timestampColumn} > '${lastSync.toISOString()}'`
: '';
return `
SELECT *
FROM ${table}
${whereClause}
ORDER BY ${timestampColumn} ASC
LIMIT 10000
`;
};
// Build change detection query (for tables without timestamps)
const buildChangeDetectionQuery = (table, primaryKey, checksumColumns) => {
return `
SELECT
${primaryKey},
MD5(CONCAT(${checksumColumns.join(', ')})) as row_checksum
FROM ${table}
`;
};
// Extraction strategy
let extractionStrategy;
if (config.incrementalColumn) {
extractionStrategy = {
type: 'incremental',
query: buildIncrementalQuery(
config.table,
config.incrementalColumn,
lastSyncTime
),
watermark: {
column: config.incrementalColumn,
lastValue: lastSync?.lastValue
}
};
} else if (config.checksumColumns) {
extractionStrategy = {
type: 'change_detection',
query: buildChangeDetectionQuery(
config.table,
config.primaryKey,
config.checksumColumns
),
previousChecksums: lastSync?.checksums || {}
};
} else {
extractionStrategy = {
type: 'full',
query: `SELECT * FROM ${config.table}`
};
}
return {
json: {
strategy: extractionStrategy,
syncWindow: {
start: lastSyncTime.toISOString(),
end: now.toISOString()
},
table: config.table,
batchId: `sync_${config.table}_${Date.now()}`
}
};Data Transformation
Schema Transformation
// n8n Function Node - Schema Transformer
const sourceData = $input.all().map(i => i.json);
const schemaMapping = $('Schema Mapping Config').first().json;
// Schema transformation functions
const transformers = {
// Type conversions
toString: (value) => String(value ?? ''),
toNumber: (value) => {
const num = Number(value);
return isNaN(num) ? null : num;
},
toBoolean: (value) => {
if (typeof value === 'boolean') return value;
if (typeof value === 'string') {
return ['true', '1', 'yes', 'y'].includes(value.toLowerCase());
}
return Boolean(value);
},
toDate: (value, format) => {
if (!value) return null;
const date = new Date(value);
return isNaN(date.getTime()) ? null : date.toISOString();
},
toArray: (value, delimiter = ',') => {
if (Array.isArray(value)) return value;
if (typeof value === 'string') return value.split(delimiter).map(s => s.trim());
return [value];
},
// String transformations
trim: (value) => String(value ?? '').trim(),
uppercase: (value) => String(value ?? '').toUpperCase(),
lowercase: (value) => String(value ?? '').toLowerCase(),
capitalize: (value) => {
const str = String(value ?? '');
return str.charAt(0).toUpperCase() + str.slice(1).toLowerCase();
},
// Data cleaning
removeHtml: (value) => String(value ?? '').replace(/<[^>]*>/g, ''),
normalizeWhitespace: (value) => String(value ?? '').replace(/\s+/g, ' ').trim(),
extractNumbers: (value) => String(value ?? '').replace(/[^\d.-]/g, ''),
// Lookup transformations
lookup: (value, lookupTable) => lookupTable[value] ?? value,
default: (value, defaultValue) => value ?? defaultValue,
// Concatenation
concat: (values, separator = ' ') => values.filter(v => v != null).join(separator),
// Custom expressions
expression: (value, expr, context) => {
// Safe expression evaluation
try {
const func = new Function('value', 'context', `return ${expr}`);
return func(value, context);
} catch (e) {
return null;
}
}
};
// Apply schema mapping
const transformedData = sourceData.map(record => {
const transformed = {};
for (const [targetField, mapping] of Object.entries(schemaMapping.fields)) {
let value;
// Get source value(s)
if (mapping.sourceFields) {
// Multiple source fields
value = mapping.sourceFields.map(sf => record[sf]);
} else if (mapping.sourceField) {
value = record[mapping.sourceField];
} else if (mapping.constant) {
value = mapping.constant;
}
// Apply transformations
if (mapping.transformations) {
for (const transform of mapping.transformations) {
const transformFn = transformers[transform.type];
if (transformFn) {
value = Array.isArray(value) && transform.type !== 'concat'
? value.map(v => transformFn(v, transform.params, record))
: transformFn(value, transform.params, record);
}
}
}
// Apply validation
if (mapping.validation) {
const isValid = validateField(value, mapping.validation);
if (!isValid && mapping.validation.onInvalid === 'null') {
value = null;
} else if (!isValid && mapping.validation.onInvalid === 'skip') {
continue;
}
}
transformed[targetField] = value;
}
return transformed;
});
function validateField(value, rules) {
if (rules.required && (value == null || value === '')) return false;
if (rules.type === 'email' && value && !isValidEmail(value)) return false;
if (rules.type === 'phone' && value && !isValidPhone(value)) return false;
if (rules.minLength && String(value).length < rules.minLength) return false;
if (rules.maxLength && String(value).length > rules.maxLength) return false;
if (rules.pattern && !new RegExp(rules.pattern).test(value)) return false;
return true;
}
function isValidEmail(email) {
return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
}
function isValidPhone(phone) {
return /^\+?[\d\s-()]{10,}$/.test(phone);
}
return transformedData.map(record => ({ json: record }));Data Aggregation
// n8n Function Node - Data Aggregator
const data = $input.all().map(i => i.json);
const aggregationConfig = $('Aggregation Config').first().json;
// Aggregation functions
const aggregators = {
sum: (values) => values.reduce((a, b) => a + (Number(b) || 0), 0),
avg: (values) => {
const nums = values.filter(v => !isNaN(Number(v))).map(Number);
return nums.length ? nums.reduce((a, b) => a + b, 0) / nums.length : null;
},
min: (values) => Math.min(...values.filter(v => !isNaN(Number(v))).map(Number)),
max: (values) => Math.max(...values.filter(v => !isNaN(Number(v))).map(Number)),
count: (values) => values.length,
countDistinct: (values) => new Set(values).size,
first: (values) => values[0],
last: (values) => values[values.length - 1],
concat: (values, separator = ', ') => [...new Set(values)].join(separator),
mode: (values) => {
const freq = {};
values.forEach(v => freq[v] = (freq[v] || 0) + 1);
return Object.entries(freq).sort((a, b) => b[1] - a[1])[0]?.[0];
},
median: (values) => {
const sorted = values.filter(v => !isNaN(Number(v))).map(Number).sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
return sorted.length % 2 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
},
stddev: (values) => {
const nums = values.filter(v => !isNaN(Number(v))).map(Number);
const avg = nums.reduce((a, b) => a + b, 0) / nums.length;
const squareDiffs = nums.map(v => Math.pow(v - avg, 2));
return Math.sqrt(squareDiffs.reduce((a, b) => a + b, 0) / nums.length);
}
};
// Group data
const grouped = {};
for (const record of data) {
// Build group key
const groupKey = aggregationConfig.groupBy
.map(field => record[field])
.join('|');
if (!grouped[groupKey]) {
grouped[groupKey] = {
_groupKey: groupKey,
_groupValues: {},
_records: []
};
// Store group-by values
for (const field of aggregationConfig.groupBy) {
grouped[groupKey]._groupValues[field] = record[field];
}
}
grouped[groupKey]._records.push(record);
}
// Apply aggregations
const results = Object.values(grouped).map(group => {
const result = { ...group._groupValues };
for (const agg of aggregationConfig.aggregations) {
const values = group._records.map(r => r[agg.field]);
const aggregator = aggregators[agg.function];
if (aggregator) {
result[agg.alias || `${agg.function}_${agg.field}`] = aggregator(values, agg.params);
}
}
// Add record count
result._recordCount = group._records.length;
return result;
});
// Apply having clause (post-aggregation filtering)
let filteredResults = results;
if (aggregationConfig.having) {
filteredResults = results.filter(record => {
return evaluateCondition(record, aggregationConfig.having);
});
}
// Sort results
if (aggregationConfig.orderBy) {
filteredResults.sort((a, b) => {
for (const sort of aggregationConfig.orderBy) {
const aVal = a[sort.field];
const bVal = b[sort.field];
const comparison = aVal < bVal ? -1 : aVal > bVal ? 1 : 0;
if (comparison !== 0) {
return sort.direction === 'desc' ? -comparison : comparison;
}
}
return 0;
});
}
function evaluateCondition(record, condition) {
const value = record[condition.field];
switch (condition.operator) {
case '>': return value > condition.value;
case '>=': return value >= condition.value;
case '<': return value < condition.value;
case '<=': return value <= condition.value;
case '=': return value === condition.value;
case '!=': return value !== condition.value;
default: return true;
}
}
return filteredResults.map(record => ({ json: record }));Data Enrichment
// n8n Function Node - Data Enricher
const sourceData = $input.all().map(i => i.json);
const enrichmentConfig = $('Enrichment Config').first().json;
// Lookup data from other sources
const lookupTables = {
customers: $('Customer Data').all().reduce((acc, item) => {
acc[item.json.id] = item.json;
return acc;
}, {}),
products: $('Product Data').all().reduce((acc, item) => {
acc[item.json.sku] = item.json;
return acc;
}, {}),
regions: $('Region Data').all().reduce((acc, item) => {
acc[item.json.code] = item.json;
return acc;
}, {})
};
// Enrichment functions
const enrichers = {
lookup: (record, config) => {
const lookupTable = lookupTables[config.source];
const lookupKey = record[config.keyField];
const lookupRecord = lookupTable?.[lookupKey];
if (lookupRecord && config.fields) {
const enriched = {};
for (const field of config.fields) {
const targetName = field.alias || field.name;
enriched[targetName] = lookupRecord[field.name];
}
return enriched;
}
return {};
},
calculate: (record, config) => {
const result = {};
for (const calc of config.calculations) {
switch (calc.type) {
case 'multiply':
result[calc.target] = (record[calc.fields[0]] || 0) * (record[calc.fields[1]] || 0);
break;
case 'divide':
const divisor = record[calc.fields[1]] || 1;
result[calc.target] = divisor !== 0 ? (record[calc.fields[0]] || 0) / divisor : null;
break;
case 'add':
result[calc.target] = calc.fields.reduce((sum, f) => sum + (record[f] || 0), 0);
break;
case 'subtract':
result[calc.target] = (record[calc.fields[0]] || 0) - (record[calc.fields[1]] || 0);
break;
case 'percentage':
const total = record[calc.fields[1]] || 1;
result[calc.target] = total !== 0 ? ((record[calc.fields[0]] || 0) / total * 100).toFixed(2) : null;
break;
case 'datediff':
const date1 = new Date(record[calc.fields[0]]);
const date2 = new Date(record[calc.fields[1]]);
const diffMs = date2 - date1;
result[calc.target] = Math.floor(diffMs / (1000 * 60 * 60 * 24)); // days
break;
}
}
return result;
},
derive: (record, config) => {
const result = {};
for (const derivation of config.derivations) {
const value = record[derivation.sourceField];
switch (derivation.type) {
case 'categorize':
result[derivation.target] = categorize(value, derivation.ranges);
break;
case 'extract':
if (derivation.pattern) {
const match = String(value).match(new RegExp(derivation.pattern));
result[derivation.target] = match ? match[derivation.group || 0] : null;
}
break;
case 'datepart':
const date = new Date(value);
result[derivation.target] = extractDatePart(date, derivation.part);
break;
case 'conditional':
result[derivation.target] = evaluateConditional(value, derivation.conditions, record);
break;
}
}
return result;
},
geocode: async (record, config) => {
// Would call geocoding API - simplified for example
if (record[config.addressField]) {
return {
latitude: null,
longitude: null,
formatted_address: record[config.addressField]
};
}
return {};
}
};
function categorize(value, ranges) {
for (const range of ranges) {
if (range.min !== undefined && value < range.min) continue;
if (range.max !== undefined && value > range.max) continue;
return range.label;
}
return 'Other';
}
function extractDatePart(date, part) {
if (isNaN(date.getTime())) return null;
switch (part) {
case 'year': return date.getFullYear();
case 'month': return date.getMonth() + 1;
case 'day': return date.getDate();
case 'quarter': return Math.ceil((date.getMonth() + 1) / 3);
case 'weekday': return date.getDay();
case 'week': return getWeekNumber(date);
default: return null;
}
}
function getWeekNumber(date) {
const d = new Date(Date.UTC(date.getFullYear(), date.getMonth(), date.getDate()));
const dayNum = d.getUTCDay() || 7;
d.setUTCDate(d.getUTCDate() + 4 - dayNum);
const yearStart = new Date(Date.UTC(d.getUTCFullYear(), 0, 1));
return Math.ceil((((d - yearStart) / 86400000) + 1) / 7);
}
function evaluateConditional(value, conditions, record) {
for (const condition of conditions) {
let matches = true;
for (const check of condition.checks) {
const checkValue = check.field ? record[check.field] : value;
if (!evaluateCheck(checkValue, check.operator, check.value)) {
matches = false;
break;
}
}
if (matches) return condition.result;
}
return conditions.find(c => c.default)?.result || null;
}
function evaluateCheck(value, operator, compareValue) {
switch (operator) {
case '=': return value === compareValue;
case '!=': return value !== compareValue;
case '>': return value > compareValue;
case '>=': return value >= compareValue;
case '<': return value < compareValue;
case '<=': return value <= compareValue;
case 'contains': return String(value).includes(compareValue);
case 'startsWith': return String(value).startsWith(compareValue);
case 'in': return compareValue.includes(value);
default: return false;
}
}
// Apply enrichments
const enrichedData = sourceData.map(record => {
let enriched = { ...record };
for (const enrichment of enrichmentConfig.enrichments) {
const enricher = enrichers[enrichment.type];
if (enricher) {
const additionalData = enricher(enriched, enrichment.config);
enriched = { ...enriched, ...additionalData };
}
}
return enriched;
});
return enrichedData.map(record => ({ json: record }));Data Validation
// n8n Function Node - Data Validator
const data = $input.all().map(i => i.json);
const validationRules = $('Validation Rules').first().json;
const validators = {
required: (value) => value != null && value !== '',
type: (value, expectedType) => {
switch (expectedType) {
case 'string': return typeof value === 'string';
case 'number': return typeof value === 'number' && !isNaN(value);
case 'boolean': return typeof value === 'boolean';
case 'date': return !isNaN(new Date(value).getTime());
case 'array': return Array.isArray(value);
case 'object': return typeof value === 'object' && !Array.isArray(value);
default: return true;
}
},
minLength: (value, min) => String(value ?? '').length >= min,
maxLength: (value, max) => String(value ?? '').length <= max,
min: (value, min) => Number(value) >= min,
max: (value, max) => Number(value) <= max,
pattern: (value, pattern) => new RegExp(pattern).test(String(value ?? '')),
email: (value) => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(String(value ?? '')),
url: (value) => {
try { new URL(value); return true; } catch { return false; }
},
phone: (value) => /^\+?[\d\s-()]{10,}$/.test(String(value ?? '')),
in: (value, allowedValues) => allowedValues.includes(value),
notIn: (value, disallowedValues) => !disallowedValues.includes(value),
unique: (value, field, allRecords) => {
const values = allRecords.map(r => r[field]);
return values.filter(v => v === value).length === 1;
},
custom: (value, expression, record) => {
try {
const func = new Function('value', 'record', `return ${expression}`);
return func(value, record);
} catch {
return false;
}
}
};
// Validate all records
const results = {
valid: [],
invalid: [],
summary: {
total: data.length,
valid: 0,
invalid: 0,
errorsByField: {},
errorsByType: {}
}
};
for (let i = 0; i < data.length; i++) {
const record = data[i];
const errors = [];
// Apply field-level rules
for (const [field, rules] of Object.entries(validationRules.fields || {})) {
const value = record[field];
for (const rule of rules) {
const validator = validators[rule.type];
if (!validator) continue;
let isValid;
if (rule.type === 'unique') {
isValid = validator(value, field, data);
} else if (rule.type === 'custom') {
isValid = validator(value, rule.expression, record);
} else {
isValid = validator(value, rule.value);
}
if (!isValid) {
const error = {
field,
rule: rule.type,
value,
message: rule.message || `Validation failed: ${rule.type}`,
severity: rule.severity || 'error'
};
errors.push(error);
// Track error statistics
results.summary.errorsByField[field] = (results.summary.errorsByField[field] || 0) + 1;
results.summary.errorsByType[rule.type] = (results.summary.errorsByType[rule.type] || 0) + 1;
}
}
}
// Apply record-level rules
for (const rule of validationRules.records || []) {
if (rule.type === 'custom') {
const isValid = validators.custom(null, rule.expression, record);
if (!isValid) {
errors.push({
field: '_record',
rule: 'custom',
message: rule.message || 'Record validation failed',
severity: rule.severity || 'error'
});
}
}
}
// Categorize record
const hasErrors = errors.some(e => e.severity === 'error');
if (hasErrors) {
results.invalid.push({
recordIndex: i,
record,
errors
});
results.summary.invalid++;
} else {
results.valid.push({
recordIndex: i,
record,
warnings: errors.filter(e => e.severity === 'warning')
});
results.summary.valid++;
}
}
// Output valid and invalid separately
return [
{ json: { type: 'validation_results', ...results.summary } },
...results.valid.map(v => ({ json: { ...v.record, _validation: 'valid', _warnings: v.warnings } })),
...results.invalid.map(v => ({ json: { ...v.record, _validation: 'invalid', _errors: v.errors } }))
];Data Loading
Batch Loading with Error Handling
// n8n Function Node - Batch Loader
const data = $input.all().map(i => i.json);
const loadConfig = $('Load Config').first().json;
// Batch configuration
const batchSize = loadConfig.batchSize || 100;
const maxRetries = loadConfig.maxRetries || 3;
const retryDelay = loadConfig.retryDelay || 1000;
// Split into batches
const batches = [];
for (let i = 0; i < data.length; i += batchSize) {
batches.push({
batchNumber: Math.floor(i / batchSize) + 1,
records: data.slice(i, i + batchSize),
retryCount: 0
});
}
// Load strategy
const loadStrategy = {
insert: {
operation: 'insert',
onConflict: loadConfig.onConflict || 'error' // error, skip, update
},
upsert: {
operation: 'upsert',
keyFields: loadConfig.keyFields || ['id'],
updateFields: loadConfig.updateFields || null // null = all fields
},
update: {
operation: 'update',
keyFields: loadConfig.keyFields || ['id'],
updateFields: loadConfig.updateFields
},
delete: {
operation: 'delete',
keyFields: loadConfig.keyFields || ['id']
}
};
// Generate load operations
const loadOperations = batches.map(batch => ({
batchId: `batch_${batch.batchNumber}_${Date.now()}`,
batchNumber: batch.batchNumber,
totalBatches: batches.length,
recordCount: batch.records.length,
records: batch.records,
strategy: loadStrategy[loadConfig.strategy] || loadStrategy.insert,
destination: {
type: loadConfig.destinationType, // database, api, file
connection: loadConfig.connection,
table: loadConfig.table,
schema: loadConfig.schema
},
retryConfig: {
maxRetries,
retryDelay,
currentRetry: 0
}
}));
return loadOperations.map(op => ({ json: op }));Load Result Handler
// n8n Function Node - Load Result Handler
const loadResult = $input.first().json;
const batchInfo = $('Batch Info').first().json;
// Process load result
const result = {
batchId: batchInfo.batchId,
batchNumber: batchInfo.batchNumber,
status: loadResult.success ? 'completed' : 'failed',
recordsProcessed: loadResult.rowsAffected || 0,
recordsExpected: batchInfo.recordCount,
errors: loadResult.errors || [],
startTime: batchInfo.startTime,
endTime: new Date().toISOString(),
duration: null
};
// Calculate duration
if (batchInfo.startTime) {
const start = new Date(batchInfo.startTime);
const end = new Date(result.endTime);
result.duration = end - start;
}
// Determine if retry needed
const shouldRetry = !loadResult.success &&
batchInfo.retryConfig.currentRetry < batchInfo.retryConfig.maxRetries;
if (shouldRetry) {
result.status = 'retrying';
result.nextRetry = {
attemptNumber: batchInfo.retryConfig.currentRetry + 1,
delay: batchInfo.retryConfig.retryDelay * Math.pow(2, batchInfo.retryConfig.currentRetry)
};
}
// Log metrics
const metrics = {
batchId: result.batchId,
recordsPerSecond: result.duration > 0
? (result.recordsProcessed / (result.duration / 1000)).toFixed(2)
: 0,
errorRate: batchInfo.recordCount > 0
? ((result.errors.length / batchInfo.recordCount) * 100).toFixed(2)
: 0
};
return {
json: {
...result,
metrics,
shouldRetry,
isLastBatch: batchInfo.batchNumber === batchInfo.totalBatches
}
};Pipeline Orchestration
// n8n Function Node - Pipeline Orchestrator
const pipelineConfig = $input.first().json;
const previousStepResult = $('Previous Step').first()?.json;
// Pipeline state management
const pipelineState = {
pipelineId: pipelineConfig.pipelineId || `pipeline_${Date.now()}`,
currentStep: pipelineConfig.currentStep || 0,
totalSteps: pipelineConfig.steps.length,
status: 'running',
startTime: pipelineConfig.startTime || new Date().toISOString(),
stepResults: pipelineConfig.stepResults || [],
metrics: {
recordsExtracted: 0,
recordsTransformed: 0,
recordsLoaded: 0,
recordsFailed: 0
}
};
// Update state with previous step result
if (previousStepResult) {
pipelineState.stepResults.push({
step: pipelineState.currentStep - 1,
name: pipelineConfig.steps[pipelineState.currentStep - 1]?.name,
status: previousStepResult.success ? 'completed' : 'failed',
recordsProcessed: previousStepResult.recordCount || 0,
errors: previousStepResult.errors || [],
duration: previousStepResult.duration
});
// Update metrics
const stepType = pipelineConfig.steps[pipelineState.currentStep - 1]?.type;
if (stepType === 'extract') {
pipelineState.metrics.recordsExtracted = previousStepResult.recordCount || 0;
} else if (stepType === 'transform') {
pipelineState.metrics.recordsTransformed = previousStepResult.recordCount || 0;
} else if (stepType === 'load') {
pipelineState.metrics.recordsLoaded = previousStepResult.recordCount || 0;
pipelineState.metrics.recordsFailed = previousStepResult.errors?.length || 0;
}
}
// Determine next action
const currentStepConfig = pipelineConfig.steps[pipelineState.currentStep];
if (!currentStepConfig) {
// Pipeline complete
pipelineState.status = 'completed';
pipelineState.endTime = new Date().toISOString();
return {
json: {
action: 'complete',
pipelineState,
summary: generatePipelineSummary(pipelineState)
}
};
}
// Check for failure conditions
const failedSteps = pipelineState.stepResults.filter(s => s.status === 'failed');
if (failedSteps.length > 0 && pipelineConfig.stopOnError) {
pipelineState.status = 'failed';
pipelineState.endTime = new Date().toISOString();
return {
json: {
action: 'fail',
pipelineState,
failedSteps
}
};
}
function generatePipelineSummary(state) {
const duration = new Date(state.endTime) - new Date(state.startTime);
return {
pipelineId: state.pipelineId,
status: state.status,
duration: `${(duration / 1000).toFixed(2)}s`,
stepsCompleted: state.stepResults.filter(s => s.status === 'completed').length,
stepsFailed: state.stepResults.filter(s => s.status === 'failed').length,
metrics: state.metrics,
throughput: state.metrics.recordsLoaded > 0 && duration > 0
? `${(state.metrics.recordsLoaded / (duration / 1000)).toFixed(2)} records/sec`
: 'N/A'
};
}
// Prepare next step
return {
json: {
action: 'execute_step',
step: currentStepConfig,
stepNumber: pipelineState.currentStep + 1,
pipelineState: {
...pipelineState,
currentStep: pipelineState.currentStep + 1
}
}
};Best Practices
ETL Design
- Idempotency: Design pipelines to be safely re-runnable
- Incremental loading: Process only changed data when possible
- Data validation: Validate at extraction and before loading
- Error handling: Implement retry logic and dead-letter queues
Performance Optimization
- Use batch processing for large datasets
- Implement parallel extraction where sources allow
- Cache lookup data to reduce API calls
- Monitor pipeline metrics for bottlenecks
Data Quality
- Implement comprehensive validation rules
- Log data quality metrics
- Create alerts for quality threshold breaches
- Maintain data lineage for debugging
Building ETL workflows with n8n enables flexible, maintainable data pipelines that can scale with your data processing needs.