Talend

Talend Dynamic Schema: Complete Guide to Runtime Schema Handling

DeviDevs Team
12 min read
#talend#dynamic-schema#etl#data-integration#metadata

Dynamic schema handling enables Talend jobs to process data with varying structures without hardcoded schemas. This guide covers techniques for runtime schema discovery and flexible data processing.

Dynamic Schema Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                    DYNAMIC SCHEMA ARCHITECTURE                               │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   STATIC SCHEMA (Traditional)           DYNAMIC SCHEMA                      │
│  ┌─────────────────────────────┐      ┌─────────────────────────────┐      │
│  │                             │      │                             │      │
│  │  Schema defined at          │      │  Schema discovered at       │      │
│  │  design time                │      │  runtime                    │      │
│  │                             │      │                             │      │
│  │  ┌───────────────────────┐  │      │  ┌───────────────────────┐  │      │
│  │  │ id     : Integer      │  │      │  │ Dynamic column        │  │      │
│  │  │ name   : String       │  │      │  │ (detects columns      │  │      │
│  │  │ amount : Double       │  │      │  │  from source)         │  │      │
│  │  │ date   : Date         │  │      │  │                       │  │      │
│  │  └───────────────────────┘  │      │  └───────────────────────┘  │      │
│  │                             │      │                             │      │
│  │  ✗ Cannot handle extra      │      │  ✓ Handles any columns     │      │
│  │    columns                  │      │  ✓ Adapts to changes       │      │
│  │  ✗ Fails on schema changes  │      │  ✓ Generic processing      │      │
│  │                             │      │                             │      │
│  └─────────────────────────────┘      └─────────────────────────────┘      │
│                                                                             │
│   USE CASES:                                                                │
│   • Processing files with varying columns                                   │
│   • Generic file loaders                                                    │
│   • Data lake ingestion                                                     │
│   • Schema evolution handling                                               │
│   • Multi-source consolidation                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Dynamic Column Configuration

Basic Dynamic Schema Setup

/*
 * tFileInputDelimited with Dynamic Schema:
 *
 * Schema Definition:
 * - Column Name: dynamic_column
 * - Type: Dynamic
 * - Length: (blank)
 *
 * Component Settings:
 * - Header: 1 (or more rows)
 * - "Use header as column names": Check this option
 */
 
/*
 * The dynamic column contains all data from the file.
 * Access the data using the Dynamic API:
 *
 * row1.dynamic_column -> Dynamic object
 * row1.dynamic_column.getColumnCount() -> Number of columns
 * row1.dynamic_column.getColumnMetadata(i) -> Column metadata
 * row1.dynamic_column.getColumnValue(i) -> Column value
 */
 
// tJavaRow - Process dynamic data
Dynamic dynRow = input_row.dynamic_column;
 
// Iterate through all columns
for (int i = 0; i < dynRow.getColumnCount(); i++) {
    DynamicMetadata metadata = dynRow.getColumnMetadata(i);
 
    String columnName = metadata.getName();
    String columnType = metadata.getType();
    Object columnValue = dynRow.getColumnValue(i);
 
    System.out.println(columnName + " (" + columnType + "): " + columnValue);
}

Accessing Dynamic Column Values

// tJavaRow - Various ways to access dynamic data
 
Dynamic dynRow = input_row.dynamic_column;
 
// Method 1: By index
Object firstValue = dynRow.getColumnValue(0);
Object secondValue = dynRow.getColumnValue(1);
 
// Method 2: By column name
Object valueByName = dynRow.getColumnValue("customer_name");
 
// Method 3: With type casting
// If you know the type, cast appropriately
String customerName = (String) dynRow.getColumnValue("customer_name");
Integer customerId = (Integer) dynRow.getColumnValue("customer_id");
Double amount = (Double) dynRow.getColumnValue("amount");
 
// Method 4: Safe access with null handling
Object rawValue = dynRow.getColumnValue("optional_field");
String safeValue = rawValue != null ? rawValue.toString() : "default";
 
// Method 5: Check if column exists
boolean hasColumn = false;
for (int i = 0; i < dynRow.getColumnCount(); i++) {
    if (dynRow.getColumnMetadata(i).getName().equals("target_column")) {
        hasColumn = true;
        break;
    }
}
 
// Method 6: Get all column names
java.util.List<String> columnNames = new java.util.ArrayList<>();
for (int i = 0; i < dynRow.getColumnCount(); i++) {
    columnNames.add(dynRow.getColumnMetadata(i).getName());
}

Creating Dynamic Output

// tJavaRow - Build dynamic output row
 
// Initialize output dynamic column
Dynamic outDyn = new Dynamic();
 
// Copy all columns from input
Dynamic inDyn = input_row.dynamic_column;
for (int i = 0; i < inDyn.getColumnCount(); i++) {
    DynamicMetadata meta = inDyn.getColumnMetadata(i);
    Object value = inDyn.getColumnValue(i);
 
    // Create new metadata for output
    DynamicMetadata outMeta = new DynamicMetadata();
    outMeta.setName(meta.getName());
    outMeta.setType(meta.getType());
    outMeta.setDbType(meta.getDbType());
    outMeta.setLength(meta.getLength());
    outMeta.setPrecision(meta.getPrecision());
 
    outDyn.addColumnMetadata(outMeta);
    outDyn.setColumnValue(i, value);
}
 
// Add calculated column
DynamicMetadata newMeta = new DynamicMetadata();
newMeta.setName("processed_timestamp");
newMeta.setType("id_Date");
outDyn.addColumnMetadata(newMeta);
outDyn.setColumnValue(outDyn.getColumnCount() - 1, new java.util.Date());
 
// Add another calculated column
DynamicMetadata calcMeta = new DynamicMetadata();
calcMeta.setName("row_hash");
calcMeta.setType("id_String");
outDyn.addColumnMetadata(calcMeta);
outDyn.setColumnValue(outDyn.getColumnCount() - 1,
    computeRowHash(inDyn));
 
output_row.dynamic_column = outDyn;
 
// Helper method
private String computeRowHash(Dynamic dynRow) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < dynRow.getColumnCount(); i++) {
        Object val = dynRow.getColumnValue(i);
        sb.append(val != null ? val.toString() : "null");
        sb.append("|");
    }
    return Integer.toHexString(sb.toString().hashCode());
}

Schema Discovery Patterns

Runtime Schema Detection

// tJava - Discover schema from first row of file
 
String filePath = context.input_file_path;
String delimiter = context.delimiter;
String encoding = context.file_encoding;
 
// Read first row for headers
java.io.BufferedReader reader = new java.io.BufferedReader(
    new java.io.InputStreamReader(
        new java.io.FileInputStream(filePath), encoding));
 
String headerLine = reader.readLine();
reader.close();
 
// Parse headers
String[] headers = headerLine.split(delimiter, -1);
 
// Store schema information
java.util.List<java.util.Map<String, String>> schema =
    new java.util.ArrayList<>();
 
for (String header : headers) {
    java.util.Map<String, String> columnDef = new java.util.HashMap<>();
    columnDef.put("name", header.trim());
    columnDef.put("type", "String"); // Default type
    columnDef.put("length", "255");
    schema.add(columnDef);
}
 
globalMap.put("discovered_schema", schema);
globalMap.put("column_count", headers.length);
 
System.out.println("Discovered " + headers.length + " columns:");
for (java.util.Map<String, String> col : schema) {
    System.out.println("  - " + col.get("name"));
}

Type Inference from Data

// tJava - Infer column types by sampling data
 
String filePath = context.input_file_path;
int sampleSize = 100; // Number of rows to sample
 
java.io.BufferedReader reader = new java.io.BufferedReader(
    new java.io.FileReader(filePath));
 
// Read header
String headerLine = reader.readLine();
String[] headers = headerLine.split(",", -1);
 
// Initialize type trackers
java.util.Map<Integer, java.util.Set<String>> possibleTypes =
    new java.util.HashMap<>();
for (int i = 0; i < headers.length; i++) {
    possibleTypes.put(i, new java.util.HashSet<>(
        java.util.Arrays.asList("Integer", "Double", "Date", "Boolean", "String")));
}
 
// Sample data rows
String line;
int rowCount = 0;
while ((line = reader.readLine()) != null && rowCount < sampleSize) {
    String[] values = line.split(",", -1);
 
    for (int i = 0; i < Math.min(values.length, headers.length); i++) {
        String value = values[i].trim();
        if (!value.isEmpty()) {
            java.util.Set<String> types = possibleTypes.get(i);
 
            // Remove incompatible types
            if (!isInteger(value)) types.remove("Integer");
            if (!isDouble(value)) types.remove("Double");
            if (!isDate(value)) types.remove("Date");
            if (!isBoolean(value)) types.remove("Boolean");
        }
    }
    rowCount++;
}
reader.close();
 
// Determine final type (most specific remaining)
java.util.List<java.util.Map<String, String>> schema = new java.util.ArrayList<>();
for (int i = 0; i < headers.length; i++) {
    java.util.Map<String, String> col = new java.util.HashMap<>();
    col.put("name", headers[i].trim());
 
    java.util.Set<String> types = possibleTypes.get(i);
    String finalType;
    if (types.contains("Integer")) finalType = "Integer";
    else if (types.contains("Double")) finalType = "Double";
    else if (types.contains("Date")) finalType = "Date";
    else if (types.contains("Boolean")) finalType = "Boolean";
    else finalType = "String";
 
    col.put("type", finalType);
    schema.add(col);
 
    System.out.println(col.get("name") + " -> " + finalType);
}
 
globalMap.put("inferred_schema", schema);
 
// Type checking helpers
private static boolean isInteger(String s) {
    try { Integer.parseInt(s); return true; }
    catch (NumberFormatException e) { return false; }
}
 
private static boolean isDouble(String s) {
    try { Double.parseDouble(s); return true; }
    catch (NumberFormatException e) { return false; }
}
 
private static boolean isDate(String s) {
    String[] patterns = {"yyyy-MM-dd", "MM/dd/yyyy", "dd-MM-yyyy"};
    for (String pattern : patterns) {
        try {
            new java.text.SimpleDateFormat(pattern).parse(s);
            return true;
        } catch (Exception e) {}
    }
    return false;
}
 
private static boolean isBoolean(String s) {
    return s.equalsIgnoreCase("true") || s.equalsIgnoreCase("false") ||
           s.equals("1") || s.equals("0") ||
           s.equalsIgnoreCase("yes") || s.equalsIgnoreCase("no");
}

Dynamic Database Operations

Dynamic Table Creation

// tJava - Generate CREATE TABLE from dynamic schema
 
@SuppressWarnings("unchecked")
java.util.List<java.util.Map<String, String>> schema =
    (java.util.List<java.util.Map<String, String>>) globalMap.get("inferred_schema");
 
String tableName = context.target_table;
String dbType = context.database_type; // MSSQL, Oracle, MySQL, PostgreSQL
 
StringBuilder createSQL = new StringBuilder();
createSQL.append("CREATE TABLE ").append(tableName).append(" (\n");
 
for (int i = 0; i < schema.size(); i++) {
    java.util.Map<String, String> col = schema.get(i);
    String colName = sanitizeColumnName(col.get("name"));
    String colType = mapToDbType(col.get("type"), dbType);
 
    createSQL.append("    ").append(colName).append(" ").append(colType);
 
    if (i < schema.size() - 1) {
        createSQL.append(",");
    }
    createSQL.append("\n");
}
 
createSQL.append(")");
 
System.out.println("Generated DDL:\n" + createSQL.toString());
globalMap.put("create_table_sql", createSQL.toString());
 
// Helper methods
private static String sanitizeColumnName(String name) {
    // Remove special characters, replace spaces
    return name.replaceAll("[^a-zA-Z0-9_]", "_")
               .replaceAll("^[0-9]", "_$0"); // Prefix numbers
}
 
private static String mapToDbType(String talendType, String dbType) {
    switch (talendType) {
        case "Integer":
            return dbType.equals("MSSQL") ? "INT" :
                   dbType.equals("Oracle") ? "NUMBER(10)" : "INTEGER";
        case "Double":
            return dbType.equals("MSSQL") ? "FLOAT" :
                   dbType.equals("Oracle") ? "NUMBER(18,6)" : "DOUBLE";
        case "Date":
            return dbType.equals("MSSQL") ? "DATETIME" :
                   dbType.equals("Oracle") ? "DATE" : "TIMESTAMP";
        case "Boolean":
            return dbType.equals("MSSQL") ? "BIT" :
                   dbType.equals("Oracle") ? "NUMBER(1)" : "BOOLEAN";
        default:
            return dbType.equals("MSSQL") ? "VARCHAR(255)" :
                   dbType.equals("Oracle") ? "VARCHAR2(255)" : "VARCHAR(255)";
    }
}

Dynamic Insert Statement

// tJavaRow - Generate and execute dynamic INSERT
 
Dynamic dynRow = input_row.dynamic_column;
java.sql.Connection conn = (java.sql.Connection) globalMap.get("conn_tDBConnection_1");
String tableName = context.target_table;
 
// Build INSERT statement
StringBuilder insertSQL = new StringBuilder();
insertSQL.append("INSERT INTO ").append(tableName).append(" (");
 
StringBuilder placeholders = new StringBuilder();
 
for (int i = 0; i < dynRow.getColumnCount(); i++) {
    String colName = sanitizeColumnName(dynRow.getColumnMetadata(i).getName());
    insertSQL.append(colName);
    placeholders.append("?");
 
    if (i < dynRow.getColumnCount() - 1) {
        insertSQL.append(", ");
        placeholders.append(", ");
    }
}
 
insertSQL.append(") VALUES (").append(placeholders).append(")");
 
// Prepare and execute statement
java.sql.PreparedStatement ps = conn.prepareStatement(insertSQL.toString());
 
for (int i = 0; i < dynRow.getColumnCount(); i++) {
    Object value = dynRow.getColumnValue(i);
    String type = dynRow.getColumnMetadata(i).getType();
 
    if (value == null) {
        ps.setNull(i + 1, java.sql.Types.VARCHAR);
    } else if (type.contains("Integer")) {
        ps.setInt(i + 1, Integer.parseInt(value.toString()));
    } else if (type.contains("Double") || type.contains("Float")) {
        ps.setDouble(i + 1, Double.parseDouble(value.toString()));
    } else if (type.contains("Date")) {
        ps.setTimestamp(i + 1, new java.sql.Timestamp(
            ((java.util.Date) value).getTime()));
    } else {
        ps.setString(i + 1, value.toString());
    }
}
 
ps.executeUpdate();
ps.close();

Generic File Loader Pattern

Complete Dynamic File Processor

/*
 * Job Structure:
 *
 * tJava_Init (Schema discovery)
 *     │
 *     ▼
 * tJavaRow_CreateTable (DDL generation + execution)
 *     │
 *     ▼
 * tFileInputDelimited (Dynamic) → tJavaRow_Transform → tDBOutput (Dynamic)
 */
 
// Step 1: tJava_Init - Discover schema and prepare
// (Use schema discovery code from earlier)
 
// Step 2: tJavaRow_CreateTable
String createSQL = (String) globalMap.get("create_table_sql");
java.sql.Connection conn = (java.sql.Connection) globalMap.get("conn_tDBConnection_1");
 
// Drop table if exists
String dropSQL = "DROP TABLE IF EXISTS " + context.target_table;
try {
    conn.createStatement().executeUpdate(dropSQL);
} catch (Exception e) {
    // Table might not exist, ignore
}
 
// Create table
conn.createStatement().executeUpdate(createSQL);
System.out.println("Table created: " + context.target_table);
 
// Step 3: tJavaRow_Transform - Process each row
Dynamic inDyn = input_row.dynamic_column;
Dynamic outDyn = new Dynamic();
 
// Copy and transform columns
for (int i = 0; i < inDyn.getColumnCount(); i++) {
    DynamicMetadata meta = inDyn.getColumnMetadata(i);
    Object value = inDyn.getColumnValue(i);
 
    // Create output metadata
    DynamicMetadata outMeta = new DynamicMetadata();
    outMeta.setName(sanitizeColumnName(meta.getName()));
    outMeta.setType(meta.getType());
    outMeta.setLength(meta.getLength());
 
    // Transform value if needed
    Object transformedValue = transformValue(value, meta.getType());
 
    outDyn.addColumnMetadata(outMeta);
    outDyn.setColumnValue(i, transformedValue);
}
 
// Add audit columns
addAuditColumns(outDyn);
 
output_row.dynamic_column = outDyn;
 
// Helper: Transform value based on type
private static Object transformValue(Object value, String type) {
    if (value == null) return null;
 
    String strValue = value.toString().trim();
    if (strValue.isEmpty()) return null;
 
    try {
        if (type.contains("Integer")) {
            return Integer.parseInt(strValue);
        } else if (type.contains("Double")) {
            return Double.parseDouble(strValue);
        } else if (type.contains("Date")) {
            return parseDate(strValue);
        } else {
            return strValue;
        }
    } catch (Exception e) {
        System.err.println("Transform error for " + type + ": " + strValue);
        return strValue; // Return as string if conversion fails
    }
}
 
// Helper: Add audit columns
private static void addAuditColumns(Dynamic dynRow) {
    // Add load timestamp
    DynamicMetadata loadTimeMeta = new DynamicMetadata();
    loadTimeMeta.setName("etl_load_timestamp");
    loadTimeMeta.setType("id_Date");
    dynRow.addColumnMetadata(loadTimeMeta);
    dynRow.setColumnValue(dynRow.getColumnCount() - 1, new java.util.Date());
 
    // Add source file
    DynamicMetadata sourceMeta = new DynamicMetadata();
    sourceMeta.setName("etl_source_file");
    sourceMeta.setType("id_String");
    dynRow.addColumnMetadata(sourceMeta);
    dynRow.setColumnValue(dynRow.getColumnCount() - 1, context.input_file_path);
}

Schema Comparison and Evolution

Compare Schemas

// tJava - Compare source and target schemas
 
// Get source schema (from file discovery)
@SuppressWarnings("unchecked")
java.util.List<String> sourceColumns =
    (java.util.List<String>) globalMap.get("source_columns");
 
// Get target schema (from database)
java.sql.Connection conn = (java.sql.Connection) globalMap.get("conn_tDBConnection_1");
java.util.List<String> targetColumns = new java.util.ArrayList<>();
 
java.sql.DatabaseMetaData metaData = conn.getMetaData();
java.sql.ResultSet columns = metaData.getColumns(null, null, context.target_table, null);
while (columns.next()) {
    targetColumns.add(columns.getString("COLUMN_NAME").toLowerCase());
}
columns.close();
 
// Find differences
java.util.Set<String> sourceSet = new java.util.HashSet<>();
for (String col : sourceColumns) {
    sourceSet.add(sanitizeColumnName(col).toLowerCase());
}
 
java.util.Set<String> targetSet = new java.util.HashSet<>(targetColumns);
 
// New columns in source (not in target)
java.util.Set<String> newColumns = new java.util.HashSet<>(sourceSet);
newColumns.removeAll(targetSet);
 
// Missing columns in source (in target but not in source)
java.util.Set<String> missingColumns = new java.util.HashSet<>(targetSet);
missingColumns.removeAll(sourceSet);
 
// Common columns
java.util.Set<String> commonColumns = new java.util.HashSet<>(sourceSet);
commonColumns.retainAll(targetSet);
 
System.out.println("Schema comparison for " + context.target_table);
System.out.println("Common columns: " + commonColumns.size());
System.out.println("New columns (to add): " + newColumns);
System.out.println("Missing columns (in target only): " + missingColumns);
 
globalMap.put("new_columns", new java.util.ArrayList<>(newColumns));
globalMap.put("common_columns", new java.util.ArrayList<>(commonColumns));
globalMap.put("schema_changed", !newColumns.isEmpty());

Handle Schema Evolution

// tJava - Add new columns to target table
 
@SuppressWarnings("unchecked")
java.util.List<String> newColumns =
    (java.util.List<String>) globalMap.get("new_columns");
 
if (newColumns.isEmpty()) {
    System.out.println("No schema changes detected");
    return;
}
 
java.sql.Connection conn = (java.sql.Connection) globalMap.get("conn_tDBConnection_1");
String tableName = context.target_table;
 
// Get column types from source schema
@SuppressWarnings("unchecked")
java.util.List<java.util.Map<String, String>> sourceSchema =
    (java.util.List<java.util.Map<String, String>>) globalMap.get("inferred_schema");
 
java.util.Map<String, String> columnTypes = new java.util.HashMap<>();
for (java.util.Map<String, String> col : sourceSchema) {
    columnTypes.put(sanitizeColumnName(col.get("name")).toLowerCase(), col.get("type"));
}
 
// Add each new column
for (String colName : newColumns) {
    String colType = columnTypes.getOrDefault(colName, "String");
    String dbType = mapToDbType(colType, context.database_type);
 
    String alterSQL = "ALTER TABLE " + tableName + " ADD " + colName + " " + dbType;
 
    try {
        conn.createStatement().executeUpdate(alterSQL);
        System.out.println("Added column: " + colName + " " + dbType);
    } catch (Exception e) {
        System.err.println("Failed to add column " + colName + ": " + e.getMessage());
    }
}

Best Practices

dynamic_schema_best_practices:
  design:
    - "Use dynamic schema for truly variable data structures"
    - "Prefer static schema when structure is known and stable"
    - "Implement type inference for better target compatibility"
    - "Always sanitize column names for database compatibility"
 
  performance:
    - "Cache schema discovery results for multi-file processing"
    - "Use prepared statements for dynamic inserts"
    - "Batch database operations when possible"
    - "Minimize schema changes at runtime"
 
  data_quality:
    - "Validate data types during transformation"
    - "Handle null and empty values explicitly"
    - "Log type conversion failures"
    - "Add audit columns for traceability"
 
  schema_evolution:
    - "Compare schemas before loading"
    - "Auto-add new columns when appropriate"
    - "Alert on significant schema changes"
    - "Document schema versions"
 
  error_handling:
    - "Catch and log type conversion errors"
    - "Handle missing columns gracefully"
    - "Validate required columns exist"
    - "Implement fallback for unknown types"

Conclusion

Dynamic schema handling enables flexible ETL pipelines that adapt to changing data structures. Use runtime schema discovery for generic file loaders, implement type inference for better database compatibility, and handle schema evolution gracefully. These patterns enable robust data integration even with unpredictable source structures.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.