Talend

Talend Error Handling and Logging: Complete Best Practices Guide

DeviDevs Team
15 min read
#talend#error-handling#logging#etl#best-practices

Robust error handling separates production-grade ETL from fragile data pipelines. This guide covers comprehensive patterns for exception management, logging, and building fault-tolerant Talend jobs.

Error Handling Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                    TALEND ERROR HANDLING ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   JOB EXECUTION FLOW                                                        │
│  ┌────────────────────────────────────────────────────────────────────────┐│
│  │                                                                        ││
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐               ││
│  │  │   Source    │───►│ Transform   │───►│   Target    │               ││
│  │  │   Input     │    │   (tMap)    │    │   Output    │               ││
│  │  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘               ││
│  │         │                  │                  │                       ││
│  │         │ onComponentError │ Row Reject       │ onSubjobError         ││
│  │         ▼                  ▼                  ▼                       ││
│  │  ┌──────────────────────────────────────────────────────────────┐    ││
│  │  │                    ERROR HANDLING LAYER                       │    ││
│  │  │                                                               │    ││
│  │  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │    ││
│  │  │  │   Log       │  │   Alert     │  │   Reject    │          │    ││
│  │  │  │   Error     │  │   (Email/   │  │   Records   │          │    ││
│  │  │  │   Details   │  │   Slack)    │  │   to File   │          │    ││
│  │  │  └─────────────┘  └─────────────┘  └─────────────┘          │    ││
│  │  │                                                               │    ││
│  │  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │    ││
│  │  │  │   Update    │  │   Retry     │  │   Rollback  │          │    ││
│  │  │  │   Status    │  │   Logic     │  │   Changes   │          │    ││
│  │  │  │   Table     │  │             │  │             │          │    ││
│  │  │  └─────────────┘  └─────────────┘  └─────────────┘          │    ││
│  │  │                                                               │    ││
│  │  └──────────────────────────────────────────────────────────────┘    ││
│  │                                                                        ││
│  └────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘

Component-Level Error Handling

Die on Error Setting

/*
 * Component Error Behavior:
 *
 * Die on error: true (default)
 * - Job stops immediately on error
 * - Use for critical components where failure means job should stop
 *
 * Die on error: false
 * - Job continues despite error
 * - Use when you want to handle errors gracefully
 * - Connect OnComponentError trigger for custom handling
 */
 
// Example: tDBInput with error handling
/*
 * tDBInput_1 Settings:
 * - Die on error: false
 *
 * Connect: OnComponentError → tLogRow (error details)
 * Connect: OnSubjobOk → Continue processing
 */
 
// Access error details in error handling flow:
// globalMap.get("tDBInput_1_ERROR_MESSAGE")
// globalMap.get("tDBInput_1_ERRORCODE")

OnSubjobError vs OnComponentError

/*
 * OnSubjobError:
 * - Triggered after entire subjob fails
 * - Use for cleanup, logging, alerting
 * - Fires once per subjob failure
 *
 * OnComponentError:
 * - Triggered when specific component fails
 * - More granular control
 * - Fires for each component error
 */
 
// Job structure with error handling:
/*
 * [Main Processing]
 *     │
 *     ├── OnSubjobOk → [Success Flow]
 *     │                    │
 *     │                    └── tJava: log success, update status
 *     │
 *     └── OnSubjobError → [Error Flow]
 *                            │
 *                            ├── tJava: log error details
 *                            ├── tSendMail: alert team
 *                            └── tDBOutput: log to error table
 */
 
// tJava in error flow - capture error details
String errorMessage = "";
String errorComponent = "";
 
// Check each component for errors
if (globalMap.get("tDBInput_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tDBInput_1_ERROR_MESSAGE");
    errorComponent = "tDBInput_1";
}
if (globalMap.get("tMap_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tMap_1_ERROR_MESSAGE");
    errorComponent = "tMap_1";
}
if (globalMap.get("tDBOutput_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tDBOutput_1_ERROR_MESSAGE");
    errorComponent = "tDBOutput_1";
}
 
// Store for logging
globalMap.put("error_message", errorMessage);
globalMap.put("error_component", errorComponent);
globalMap.put("error_timestamp", new java.util.Date());
 
System.err.println("ERROR in " + errorComponent + ": " + errorMessage);

Row-Level Error Handling

/*
 * tMap Reject Flow:
 *
 * Main output → Good records
 * Reject output → Bad records (null keys, failed lookups, etc.)
 */
 
// tMap configuration for reject handling:
/*
 * Output: main_output
 * - Filter: row1.customer_id != null && row2 != null (lookup found)
 *
 * Output: reject_output
 * - Catch lookup inner join reject: true
 * - Catch expression filter reject: true
 */
 
// Reject flow schema should include error details:
/*
 * reject_output schema:
 * - original_record (String) - Serialized original data
 * - reject_reason (String) - Why rejected
 * - reject_timestamp (Date) - When rejected
 * - source_file (String) - Where it came from
 */
 
// Expression for reject_reason:
Var.reject_reason = row1.customer_id == null ? "Missing customer_id" :
                    row2 == null ? "Customer lookup failed" :
                    row1.amount == null ? "Missing amount" :
                    "Unknown rejection reason"

Try-Catch Pattern

Using tJavaFlex for Try-Catch

// tJavaFlex - Wrap processing in try-catch
 
// START CODE
int successCount = 0;
int errorCount = 0;
java.util.List<String> errorMessages = new java.util.ArrayList<>();
 
// MAIN CODE
try {
    // Process the row
    output_row.id = input_row.id;
    output_row.processed_value = processValue(input_row.raw_value);
    output_row.status = "SUCCESS";
    successCount++;
 
} catch (NumberFormatException nfe) {
    // Handle specific exception
    output_row.id = input_row.id;
    output_row.processed_value = null;
    output_row.status = "ERROR";
    output_row.error_message = "Invalid number format: " + nfe.getMessage();
    errorCount++;
    errorMessages.add("Row " + input_row.id + ": " + nfe.getMessage());
 
} catch (Exception e) {
    // Handle generic exception
    output_row.id = input_row.id;
    output_row.processed_value = null;
    output_row.status = "ERROR";
    output_row.error_message = "Processing error: " + e.getMessage();
    errorCount++;
    errorMessages.add("Row " + input_row.id + ": " + e.getMessage());
 
    // Log stack trace for debugging
    java.io.StringWriter sw = new java.io.StringWriter();
    e.printStackTrace(new java.io.PrintWriter(sw));
    System.err.println("Stack trace for row " + input_row.id + ":\n" + sw.toString());
}
 
// END CODE
System.out.println("Processing complete: " + successCount + " success, " + errorCount + " errors");
globalMap.put("success_count", successCount);
globalMap.put("error_count", errorCount);
globalMap.put("error_messages", errorMessages);
 
// Fail job if error threshold exceeded
double errorRate = (double) errorCount / (successCount + errorCount);
if (errorRate > 0.05) { // 5% threshold
    throw new RuntimeException("Error rate exceeded threshold: " +
        String.format("%.2f%%", errorRate * 100));
}

Database Transaction Handling

/*
 * Job structure for transaction management:
 *
 * tDBConnection (AutoCommit: false)
 *     │
 *     ├── [Processing Flow]
 *     │       │
 *     │       ├── tDBInput
 *     │       ├── tMap
 *     │       └── tDBOutput (commit: 0 - no auto commit)
 *     │
 *     ├── OnSubjobOk → tDBCommit
 *     │
 *     └── OnSubjobError → tDBRollback → Error handling
 */
 
// tDBConnection settings:
/*
 * Auto Commit: false
 * Connection pool: Use existing or create
 */
 
// tDBOutput settings:
/*
 * Use existing connection: tDBConnection_1
 * Commit every: 0 (manual commit)
 */
 
// tJava - Manual transaction control
java.sql.Connection conn =
    (java.sql.Connection) globalMap.get("conn_tDBConnection_1");
 
try {
    // Processing happens here...
 
    // Commit on success
    conn.commit();
    System.out.println("Transaction committed successfully");
 
} catch (Exception e) {
    // Rollback on error
    try {
        conn.rollback();
        System.err.println("Transaction rolled back due to: " + e.getMessage());
    } catch (java.sql.SQLException rollbackEx) {
        System.err.println("Rollback failed: " + rollbackEx.getMessage());
    }
    throw e; // Re-throw to trigger job error handling
}

Logging Strategies

Structured Logging

// tJava - Initialize logging framework
 
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.MDC;
 
// Configure log4j
String log4jConfig = context.log4j_config_path;
PropertyConfigurator.configure(log4jConfig);
 
Logger logger = Logger.getLogger("TalendETL." + jobName);
 
// Set MDC for structured logging
MDC.put("job_name", jobName);
MDC.put("job_id", (String) globalMap.get("JOB_ID"));
MDC.put("project", projectName);
MDC.put("run_id", java.util.UUID.randomUUID().toString());
MDC.put("environment", context.environment);
 
logger.info("Job started");
 
/*
 * log4j.properties example:
 *
 * log4j.rootLogger=INFO, console, file
 *
 * log4j.appender.console=org.apache.log4j.ConsoleAppender
 * log4j.appender.console.layout=org.apache.log4j.PatternLayout
 * log4j.appender.console.layout.ConversionPattern=%d{ISO8601} [%X{job_name}] [%X{run_id}] %-5p %c - %m%n
 *
 * log4j.appender.file=org.apache.log4j.RollingFileAppender
 * log4j.appender.file.File=/var/log/talend/${job_name}.log
 * log4j.appender.file.MaxFileSize=100MB
 * log4j.appender.file.MaxBackupIndex=10
 * log4j.appender.file.layout=org.apache.log4j.PatternLayout
 * log4j.appender.file.layout.ConversionPattern=%d{ISO8601} [%X{job_name}] [%X{run_id}] %-5p %c - %m%n
 */

Database Logging

// Create log table structure
/*
 * CREATE TABLE etl_job_logs (
 *     log_id BIGINT IDENTITY PRIMARY KEY,
 *     job_name VARCHAR(100),
 *     run_id VARCHAR(50),
 *     log_level VARCHAR(20),
 *     component_name VARCHAR(100),
 *     message TEXT,
 *     record_count INT,
 *     error_details TEXT,
 *     stack_trace TEXT,
 *     context_data TEXT,
 *     created_at DATETIME DEFAULT GETDATE()
 * );
 *
 * CREATE INDEX idx_job_logs_job ON etl_job_logs(job_name, created_at);
 * CREATE INDEX idx_job_logs_run ON etl_job_logs(run_id);
 */
 
// tJavaRow - Log to database
public static void logToDatabase(String jobName, String runId, String level,
        String component, String message, Integer recordCount,
        String errorDetails, java.sql.Connection conn) throws Exception {
 
    String sql = "INSERT INTO etl_job_logs " +
        "(job_name, run_id, log_level, component_name, message, record_count, error_details) " +
        "VALUES (?, ?, ?, ?, ?, ?, ?)";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, level);
        ps.setString(4, component);
        ps.setString(5, message);
        if (recordCount != null) {
            ps.setInt(6, recordCount);
        } else {
            ps.setNull(6, java.sql.Types.INTEGER);
        }
        ps.setString(7, errorDetails);
        ps.executeUpdate();
    }
}
 
// Usage in job:
java.sql.Connection logConn =
    (java.sql.Connection) globalMap.get("conn_tDBConnection_Log");
String runId = (String) globalMap.get("run_id");
 
logToDatabase(jobName, runId, "INFO", "tDBInput_1",
    "Started reading from source", null, null, logConn);
 
// After processing:
int rowCount = ((Integer) globalMap.get("tDBInput_1_NB_LINE"));
logToDatabase(jobName, runId, "INFO", "tDBInput_1",
    "Completed reading", rowCount, null, logConn);

File-Based Logging with Rotation

// tJava - Custom file logger with rotation
 
public class ETLFileLogger {
    private java.io.PrintWriter writer;
    private String logDir;
    private String jobName;
    private String currentLogFile;
    private long maxFileSize = 100 * 1024 * 1024; // 100MB
    private int maxBackups = 10;
 
    public ETLFileLogger(String logDir, String jobName) throws Exception {
        this.logDir = logDir;
        this.jobName = jobName;
        this.currentLogFile = logDir + "/" + jobName + ".log";
        rotateIfNeeded();
        this.writer = new java.io.PrintWriter(
            new java.io.FileWriter(currentLogFile, true));
    }
 
    public synchronized void log(String level, String message) {
        String timestamp = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
            .format(new java.util.Date());
        String logLine = String.format("%s [%s] %s - %s%n",
            timestamp, Thread.currentThread().getName(), level, message);
 
        writer.print(logLine);
        writer.flush();
 
        rotateIfNeeded();
    }
 
    public void info(String message) { log("INFO", message); }
    public void warn(String message) { log("WARN", message); }
    public void error(String message) { log("ERROR", message); }
 
    public void error(String message, Throwable t) {
        log("ERROR", message);
        java.io.StringWriter sw = new java.io.StringWriter();
        t.printStackTrace(new java.io.PrintWriter(sw));
        log("ERROR", sw.toString());
    }
 
    private void rotateIfNeeded() {
        try {
            java.io.File logFile = new java.io.File(currentLogFile);
            if (logFile.exists() && logFile.length() > maxFileSize) {
                if (writer != null) writer.close();
 
                // Rotate files
                for (int i = maxBackups - 1; i >= 0; i--) {
                    java.io.File src = new java.io.File(
                        currentLogFile + (i == 0 ? "" : "." + i));
                    java.io.File dest = new java.io.File(
                        currentLogFile + "." + (i + 1));
                    if (src.exists()) {
                        if (i == maxBackups - 1) src.delete();
                        else src.renameTo(dest);
                    }
                }
 
                this.writer = new java.io.PrintWriter(
                    new java.io.FileWriter(currentLogFile, false));
            }
        } catch (Exception e) {
            System.err.println("Log rotation failed: " + e.getMessage());
        }
    }
 
    public void close() {
        if (writer != null) writer.close();
    }
}
 
// Initialize in tJava (Start):
ETLFileLogger logger = new ETLFileLogger(context.log_directory, jobName);
globalMap.put("etl_logger", logger);
logger.info("Job started - Run ID: " + globalMap.get("run_id"));
 
// Use throughout job:
((ETLFileLogger) globalMap.get("etl_logger")).info("Processing batch 1");
 
// Close in tJava (End):
((ETLFileLogger) globalMap.get("etl_logger")).close();

Alerting Integration

Email Alerts

/*
 * tSendMail Configuration for Error Alerts:
 *
 * SMTP Settings:
 * - Host: smtp.company.com
 * - Port: 587
 * - TLS: true
 * - Username: context.smtp_user
 * - Password: context.smtp_password
 *
 * Email Settings:
 * - From: talend-alerts@company.com
 * - To: data-team@company.com
 * - Subject: "[ALERT] ETL Job Failed: " + jobName
 * - Message: Build from error details
 */
 
// tJava - Build error email content
StringBuilder emailBody = new StringBuilder();
emailBody.append("ETL Job Failure Alert\n");
emailBody.append("====================\n\n");
emailBody.append("Job Name: ").append(jobName).append("\n");
emailBody.append("Project: ").append(projectName).append("\n");
emailBody.append("Run ID: ").append(globalMap.get("run_id")).append("\n");
emailBody.append("Timestamp: ").append(new java.util.Date()).append("\n");
emailBody.append("Environment: ").append(context.environment).append("\n\n");
 
emailBody.append("Error Details:\n");
emailBody.append("--------------\n");
emailBody.append("Component: ").append(globalMap.get("error_component")).append("\n");
emailBody.append("Message: ").append(globalMap.get("error_message")).append("\n\n");
 
if (globalMap.get("error_stack_trace") != null) {
    emailBody.append("Stack Trace:\n");
    emailBody.append(globalMap.get("error_stack_trace")).append("\n\n");
}
 
emailBody.append("Statistics:\n");
emailBody.append("-----------\n");
emailBody.append("Records Processed: ").append(globalMap.get("records_processed")).append("\n");
emailBody.append("Records Failed: ").append(globalMap.get("records_failed")).append("\n");
 
globalMap.put("email_body", emailBody.toString());

Slack Integration

// tJava - Send Slack notification
 
import java.net.HttpURLConnection;
import java.net.URL;
 
public static void sendSlackAlert(String webhookUrl, String message,
        String channel, String severity) throws Exception {
 
    // Build Slack message payload
    String color = severity.equals("ERROR") ? "danger" :
                   severity.equals("WARNING") ? "warning" : "good";
 
    String payload = String.format(
        "{\"channel\": \"%s\", \"attachments\": [{" +
        "\"color\": \"%s\", " +
        "\"title\": \"Talend ETL Alert\", " +
        "\"text\": \"%s\", " +
        "\"ts\": %d" +
        "}]}",
        channel, color, escapeJson(message), System.currentTimeMillis() / 1000
    );
 
    URL url = new URL(webhookUrl);
    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    conn.setRequestMethod("POST");
    conn.setRequestProperty("Content-Type", "application/json");
    conn.setDoOutput(true);
 
    java.io.OutputStream os = conn.getOutputStream();
    os.write(payload.getBytes("UTF-8"));
    os.flush();
    os.close();
 
    int responseCode = conn.getResponseCode();
    if (responseCode != 200) {
        throw new RuntimeException("Slack notification failed: " + responseCode);
    }
}
 
private static String escapeJson(String text) {
    return text.replace("\\", "\\\\")
               .replace("\"", "\\\"")
               .replace("\n", "\\n")
               .replace("\r", "\\r")
               .replace("\t", "\\t");
}
 
// Usage in error handler:
try {
    sendSlackAlert(
        context.slack_webhook_url,
        "Job: " + jobName + " failed\\nError: " + globalMap.get("error_message"),
        "#data-alerts",
        "ERROR"
    );
} catch (Exception e) {
    System.err.println("Failed to send Slack alert: " + e.getMessage());
}

PagerDuty Integration

// tJava - Trigger PagerDuty incident for critical failures
 
public static void triggerPagerDuty(String routingKey, String summary,
        String source, String severity, java.util.Map<String, Object> details)
        throws Exception {
 
    org.json.JSONObject payload = new org.json.JSONObject();
    payload.put("routing_key", routingKey);
    payload.put("event_action", "trigger");
 
    org.json.JSONObject eventPayload = new org.json.JSONObject();
    eventPayload.put("summary", summary);
    eventPayload.put("source", source);
    eventPayload.put("severity", severity); // critical, error, warning, info
 
    org.json.JSONObject customDetails = new org.json.JSONObject(details);
    eventPayload.put("custom_details", customDetails);
 
    payload.put("payload", eventPayload);
 
    java.net.URL url = new java.net.URL(
        "https://events.pagerduty.com/v2/enqueue");
    java.net.HttpURLConnection conn =
        (java.net.HttpURLConnection) url.openConnection();
    conn.setRequestMethod("POST");
    conn.setRequestProperty("Content-Type", "application/json");
    conn.setDoOutput(true);
 
    java.io.OutputStream os = conn.getOutputStream();
    os.write(payload.toString().getBytes("UTF-8"));
    os.flush();
    os.close();
 
    int responseCode = conn.getResponseCode();
    if (responseCode != 202) {
        java.io.BufferedReader reader = new java.io.BufferedReader(
            new java.io.InputStreamReader(conn.getErrorStream()));
        String line;
        StringBuilder response = new StringBuilder();
        while ((line = reader.readLine()) != null) {
            response.append(line);
        }
        throw new RuntimeException("PagerDuty failed: " + response.toString());
    }
 
    System.out.println("PagerDuty incident triggered");
}
 
// Usage for critical failures:
if (context.environment.equals("PRODUCTION")) {
    java.util.Map<String, Object> details = new java.util.HashMap<>();
    details.put("job_name", jobName);
    details.put("error_message", globalMap.get("error_message"));
    details.put("run_id", globalMap.get("run_id"));
 
    triggerPagerDuty(
        context.pagerduty_routing_key,
        "Critical ETL Failure: " + jobName,
        "talend-etl-" + context.environment.toLowerCase(),
        "critical",
        details
    );
}

Error Recovery Patterns

Retry Logic

// tLoop with retry logic for transient failures
 
// tJava_Init - Initialize retry variables
int maxRetries = 3;
int currentRetry = 0;
int retryDelayMs = 1000; // Start with 1 second
boolean success = false;
 
globalMap.put("maxRetries", maxRetries);
globalMap.put("currentRetry", currentRetry);
globalMap.put("retryDelayMs", retryDelayMs);
globalMap.put("success", success);
 
// tLoop condition:
// !((Boolean) globalMap.get("success")) && ((Integer) globalMap.get("currentRetry")) < ((Integer) globalMap.get("maxRetries"))
 
// tJava - Retry wrapper (OnSubjobError of main processing)
int currentRetry = (Integer) globalMap.get("currentRetry");
int maxRetries = (Integer) globalMap.get("maxRetries");
int retryDelayMs = (Integer) globalMap.get("retryDelayMs");
 
String errorMessage = (String) globalMap.get("error_message");
 
// Check if error is retryable
boolean isRetryable = isRetryableError(errorMessage);
 
if (isRetryable && currentRetry < maxRetries - 1) {
    currentRetry++;
    globalMap.put("currentRetry", currentRetry);
 
    System.out.println("Retry " + currentRetry + "/" + maxRetries +
        " after " + retryDelayMs + "ms - Error: " + errorMessage);
 
    // Exponential backoff
    Thread.sleep(retryDelayMs);
    globalMap.put("retryDelayMs", retryDelayMs * 2);
 
} else {
    // Max retries exceeded or non-retryable error
    System.err.println("Failed after " + (currentRetry + 1) + " attempts");
    throw new RuntimeException("Job failed: " + errorMessage);
}
 
// Helper method
private static boolean isRetryableError(String errorMessage) {
    if (errorMessage == null) return false;
    String lower = errorMessage.toLowerCase();
    return lower.contains("connection") ||
           lower.contains("timeout") ||
           lower.contains("deadlock") ||
           lower.contains("lock wait") ||
           lower.contains("temporarily unavailable");
}
 
// tJava - Mark success on successful completion
globalMap.put("success", true);

Checkpoint and Resume

/*
 * Checkpoint pattern for long-running jobs:
 *
 * 1. Save progress to checkpoint table
 * 2. On failure, read last checkpoint
 * 3. Resume from checkpoint
 */
 
// Checkpoint table structure:
/*
 * CREATE TABLE etl_checkpoints (
 *     job_name VARCHAR(100),
 *     run_id VARCHAR(50),
 *     checkpoint_key VARCHAR(200),
 *     checkpoint_value VARCHAR(500),
 *     created_at DATETIME DEFAULT GETDATE(),
 *     PRIMARY KEY (job_name, run_id, checkpoint_key)
 * );
 */
 
// tJava - Save checkpoint
public static void saveCheckpoint(String jobName, String runId,
        String key, String value, java.sql.Connection conn) throws Exception {
 
    String sql = "MERGE INTO etl_checkpoints AS target " +
        "USING (SELECT ? as job_name, ? as run_id, ? as checkpoint_key, ? as checkpoint_value) AS source " +
        "ON target.job_name = source.job_name AND target.run_id = source.run_id AND target.checkpoint_key = source.checkpoint_key " +
        "WHEN MATCHED THEN UPDATE SET checkpoint_value = source.checkpoint_value, created_at = GETDATE() " +
        "WHEN NOT MATCHED THEN INSERT (job_name, run_id, checkpoint_key, checkpoint_value) " +
        "VALUES (source.job_name, source.run_id, source.checkpoint_key, source.checkpoint_value)";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, key);
        ps.setString(4, value);
        ps.executeUpdate();
    }
}
 
// tJava - Read checkpoint
public static String getCheckpoint(String jobName, String runId,
        String key, java.sql.Connection conn) throws Exception {
 
    String sql = "SELECT checkpoint_value FROM etl_checkpoints " +
        "WHERE job_name = ? AND run_id = ? AND checkpoint_key = ?";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, key);
        try (java.sql.ResultSet rs = ps.executeQuery()) {
            if (rs.next()) {
                return rs.getString("checkpoint_value");
            }
        }
    }
    return null;
}
 
// Usage in batch processing:
String lastProcessedId = getCheckpoint(jobName, runId, "last_processed_id", conn);
if (lastProcessedId != null) {
    System.out.println("Resuming from checkpoint: " + lastProcessedId);
    globalMap.put("resume_from_id", lastProcessedId);
} else {
    System.out.println("Starting fresh run");
    globalMap.put("resume_from_id", "0");
}
 
// After each batch:
saveCheckpoint(jobName, runId, "last_processed_id", currentBatchLastId, conn);

Best Practices Summary

error_handling_best_practices:
  design:
    - "Always handle OnSubjobError for main processing flows"
    - "Use row-level reject flows for data quality issues"
    - "Implement retry logic for transient failures"
    - "Set appropriate Die on error based on component criticality"
 
  logging:
    - "Use structured logging with consistent format"
    - "Include run_id for traceability across components"
    - "Log at appropriate levels (INFO, WARN, ERROR)"
    - "Implement log rotation for file-based logging"
    - "Store critical logs in database for querying"
 
  alerting:
    - "Alert on job failures immediately"
    - "Use different channels for different severities"
    - "Include actionable information in alerts"
    - "Don't over-alert - avoid alert fatigue"
 
  recovery:
    - "Implement checkpoint/resume for long jobs"
    - "Use exponential backoff for retries"
    - "Distinguish retryable vs non-retryable errors"
    - "Test recovery procedures regularly"
 
  database:
    - "Use transactions for multi-step operations"
    - "Implement proper rollback on failure"
    - "Log all database errors with context"
    - "Handle deadlocks with retry logic"

Conclusion

Robust error handling transforms Talend jobs from fragile scripts into production-grade data pipelines. Implement component-level handling, structured logging, and multi-channel alerting. Use retry patterns for resilience and checkpointing for recoverability. These practices ensure reliable data processing even in challenging conditions.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.