Talend

Talend Error Handling and Logging: Complete Best Practices Guide

Petru Constantin
--15 min lectura
#talend#error-handling#logging#etl#best-practices

Robust error handling separates production-grade ETL from fragile data pipelines. This guide covers comprehensive patterns for exception management, logging, and building fault-tolerant Talend jobs.

Error Handling Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                    TALEND ERROR HANDLING ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   JOB EXECUTION FLOW                                                        │
│  ┌────────────────────────────────────────────────────────────────────────┐│
│  │                                                                        ││
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐               ││
│  │  │   Source    │───►│ Transform   │───►│   Target    │               ││
│  │  │   Input     │    │   (tMap)    │    │   Output    │               ││
│  │  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘               ││
│  │         │                  │                  │                       ││
│  │         │ onComponentError │ Row Reject       │ onSubjobError         ││
│  │         ▼                  ▼                  ▼                       ││
│  │  ┌──────────────────────────────────────────────────────────────┐    ││
│  │  │                    ERROR HANDLING LAYER                       │    ││
│  │  │                                                               │    ││
│  │  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │    ││
│  │  │  │   Log       │  │   Alert     │  │   Reject    │          │    ││
│  │  │  │   Error     │  │   (Email/   │  │   Records   │          │    ││
│  │  │  │   Details   │  │   Slack)    │  │   to File   │          │    ││
│  │  │  └─────────────┘  └─────────────┘  └─────────────┘          │    ││
│  │  │                                                               │    ││
│  │  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │    ││
│  │  │  │   Update    │  │   Retry     │  │   Rollback  │          │    ││
│  │  │  │   Status    │  │   Logic     │  │   Changes   │          │    ││
│  │  │  │   Table     │  │             │  │             │          │    ││
│  │  │  └─────────────┘  └─────────────┘  └─────────────┘          │    ││
│  │  │                                                               │    ││
│  │  └──────────────────────────────────────────────────────────────┘    ││
│  │                                                                        ││
│  └────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘

Component-Level Error Handling

Die on Error Setting

/*
 * Component Error Behavior:
 *
 * Die on error: true (default)
 * - Job stops immediately on error
 * - Use for critical components where failure means job should stop
 *
 * Die on error: false
 * - Job continues despite error
 * - Use when you want to handle errors gracefully
 * - Connect OnComponentError trigger for custom handling
 */
 
// Example: tDBInput with error handling
/*
 * tDBInput_1 Settings:
 * - Die on error: false
 *
 * Connect: OnComponentError → tLogRow (error details)
 * Connect: OnSubjobOk → Continue processing
 */
 
// Access error details in error handling flow:
// globalMap.get("tDBInput_1_ERROR_MESSAGE")
// globalMap.get("tDBInput_1_ERRORCODE")

OnSubjobError vs OnComponentError

/*
 * OnSubjobError:
 * - Triggered after entire subjob fails
 * - Use for cleanup, logging, alerting
 * - Fires once per subjob failure
 *
 * OnComponentError:
 * - Triggered when specific component fails
 * - More granular control
 * - Fires for each component error
 */
 
// Job structure with error handling:
/*
 * [Main Processing]
 *     │
 *     ├── OnSubjobOk → [Success Flow]
 *     │                    │
 *     │                    └── tJava: log success, update status
 *     │
 *     └── OnSubjobError → [Error Flow]
 *                            │
 *                            ├── tJava: log error details
 *                            ├── tSendMail: alert team
 *                            └── tDBOutput: log to error table
 */
 
// tJava in error flow - capture error details
String errorMessage = "";
String errorComponent = "";
 
// Check each component for errors
if (globalMap.get("tDBInput_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tDBInput_1_ERROR_MESSAGE");
    errorComponent = "tDBInput_1";
}
if (globalMap.get("tMap_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tMap_1_ERROR_MESSAGE");
    errorComponent = "tMap_1";
}
if (globalMap.get("tDBOutput_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tDBOutput_1_ERROR_MESSAGE");
    errorComponent = "tDBOutput_1";
}
 
// Store for logging
globalMap.put("error_message", errorMessage);
globalMap.put("error_component", errorComponent);
globalMap.put("error_timestamp", new java.util.Date());
 
System.err.println("ERROR in " + errorComponent + ": " + errorMessage);

Row-Level Error Handling

/*
 * tMap Reject Flow:
 *
 * Main output → Good records
 * Reject output → Bad records (null keys, failed lookups, etc.)
 */
 
// tMap configuration for reject handling:
/*
 * Output: main_output
 * - Filter: row1.customer_id != null && row2 != null (lookup found)
 *
 * Output: reject_output
 * - Catch lookup inner join reject: true
 * - Catch expression filter reject: true
 */
 
// Reject flow schema should include error details:
/*
 * reject_output schema:
 * - original_record (String) - Serialized original data
 * - reject_reason (String) - Why rejected
 * - reject_timestamp (Date) - When rejected
 * - source_file (String) - Where it came from
 */
 
// Expression for reject_reason:
Var.reject_reason = row1.customer_id == null ? "Missing customer_id" :
                    row2 == null ? "Customer lookup failed" :
                    row1.amount == null ? "Missing amount" :
                    "Unknown rejection reason"

Try-Catch Pattern

Using tJavaFlex for Try-Catch

// tJavaFlex - Wrap processing in try-catch
 
// START CODE
int successCount = 0;
int errorCount = 0;
java.util.List<String> errorMessages = new java.util.ArrayList<>();
 
// MAIN CODE
try {
    // Process the row
    output_row.id = input_row.id;
    output_row.processed_value = processValue(input_row.raw_value);
    output_row.status = "SUCCESS";
    successCount++;
 
} catch (NumberFormatException nfe) {
    // Handle specific exception
    output_row.id = input_row.id;
    output_row.processed_value = null;
    output_row.status = "ERROR";
    output_row.error_message = "Invalid number format: " + nfe.getMessage();
    errorCount++;
    errorMessages.add("Row " + input_row.id + ": " + nfe.getMessage());
 
} catch (Exception e) {
    // Handle generic exception
    output_row.id = input_row.id;
    output_row.processed_value = null;
    output_row.status = "ERROR";
    output_row.error_message = "Processing error: " + e.getMessage();
    errorCount++;
    errorMessages.add("Row " + input_row.id + ": " + e.getMessage());
 
    // Log stack trace for debugging
    java.io.StringWriter sw = new java.io.StringWriter();
    e.printStackTrace(new java.io.PrintWriter(sw));
    System.err.println("Stack trace for row " + input_row.id + ":\n" + sw.toString());
}
 
// END CODE
System.out.println("Processing complete: " + successCount + " success, " + errorCount + " errors");
globalMap.put("success_count", successCount);
globalMap.put("error_count", errorCount);
globalMap.put("error_messages", errorMessages);
 
// Fail job if error threshold exceeded
double errorRate = (double) errorCount / (successCount + errorCount);
if (errorRate > 0.05) { // 5% threshold
    throw new RuntimeException("Error rate exceeded threshold: " +
        String.format("%.2f%%", errorRate * 100));
}

Database Transaction Handling

/*
 * Job structure for transaction management:
 *
 * tDBConnection (AutoCommit: false)
 *     │
 *     ├── [Processing Flow]
 *     │       │
 *     │       ├── tDBInput
 *     │       ├── tMap
 *     │       └── tDBOutput (commit: 0 - no auto commit)
 *     │
 *     ├── OnSubjobOk → tDBCommit
 *     │
 *     └── OnSubjobError → tDBRollback → Error handling
 */
 
// tDBConnection settings:
/*
 * Auto Commit: false
 * Connection pool: Use existing or create
 */
 
// tDBOutput settings:
/*
 * Use existing connection: tDBConnection_1
 * Commit every: 0 (manual commit)
 */
 
// tJava - Manual transaction control
java.sql.Connection conn =
    (java.sql.Connection) globalMap.get("conn_tDBConnection_1");
 
try {
    // Processing happens here...
 
    // Commit on success
    conn.commit();
    System.out.println("Transaction committed successfully");
 
} catch (Exception e) {
    // Rollback on error
    try {
        conn.rollback();
        System.err.println("Transaction rolled back due to: " + e.getMessage());
    } catch (java.sql.SQLException rollbackEx) {
        System.err.println("Rollback failed: " + rollbackEx.getMessage());
    }
    throw e; // Re-throw to trigger job error handling
}

Logging Strategies

Structured Logging

// tJava - Initialize logging framework
 
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.MDC;
 
// Configure log4j
String log4jConfig = context.log4j_config_path;
PropertyConfigurator.configure(log4jConfig);
 
Logger logger = Logger.getLogger("TalendETL." + jobName);
 
// Set MDC for structured logging
MDC.put("job_name", jobName);
MDC.put("job_id", (String) globalMap.get("JOB_ID"));
MDC.put("project", projectName);
MDC.put("run_id", java.util.UUID.randomUUID().toString());
MDC.put("environment", context.environment);
 
logger.info("Job started");
 
/*
 * log4j.properties example:
 *
 * log4j.rootLogger=INFO, console, file
 *
 * log4j.appender.console=org.apache.log4j.ConsoleAppender
 * log4j.appender.console.layout=org.apache.log4j.PatternLayout
 * log4j.appender.console.layout.ConversionPattern=%d{ISO8601} [%X{job_name}] [%X{run_id}] %-5p %c - %m%n
 *
 * log4j.appender.file=org.apache.log4j.RollingFileAppender
 * log4j.appender.file.File=/var/log/talend/${job_name}.log
 * log4j.appender.file.MaxFileSize=100MB
 * log4j.appender.file.MaxBackupIndex=10
 * log4j.appender.file.layout=org.apache.log4j.PatternLayout
 * log4j.appender.file.layout.ConversionPattern=%d{ISO8601} [%X{job_name}] [%X{run_id}] %-5p %c - %m%n
 */

Database Logging

// Create log table structure
/*
 * CREATE TABLE etl_job_logs (
 *     log_id BIGINT IDENTITY PRIMARY KEY,
 *     job_name VARCHAR(100),
 *     run_id VARCHAR(50),
 *     log_level VARCHAR(20),
 *     component_name VARCHAR(100),
 *     message TEXT,
 *     record_count INT,
 *     error_details TEXT,
 *     stack_trace TEXT,
 *     context_data TEXT,
 *     created_at DATETIME DEFAULT GETDATE()
 * );
 *
 * CREATE INDEX idx_job_logs_job ON etl_job_logs(job_name, created_at);
 * CREATE INDEX idx_job_logs_run ON etl_job_logs(run_id);
 */
 
// tJavaRow - Log to database
public static void logToDatabase(String jobName, String runId, String level,
        String component, String message, Integer recordCount,
        String errorDetails, java.sql.Connection conn) throws Exception {
 
    String sql = "INSERT INTO etl_job_logs " +
        "(job_name, run_id, log_level, component_name, message, record_count, error_details) " +
        "VALUES (?, ?, ?, ?, ?, ?, ?)";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, level);
        ps.setString(4, component);
        ps.setString(5, message);
        if (recordCount != null) {
            ps.setInt(6, recordCount);
        } else {
            ps.setNull(6, java.sql.Types.INTEGER);
        }
        ps.setString(7, errorDetails);
        ps.executeUpdate();
    }
}
 
// Usage in job:
java.sql.Connection logConn =
    (java.sql.Connection) globalMap.get("conn_tDBConnection_Log");
String runId = (String) globalMap.get("run_id");
 
logToDatabase(jobName, runId, "INFO", "tDBInput_1",
    "Started reading from source", null, null, logConn);
 
// After processing:
int rowCount = ((Integer) globalMap.get("tDBInput_1_NB_LINE"));
logToDatabase(jobName, runId, "INFO", "tDBInput_1",
    "Completed reading", rowCount, null, logConn);

File-Based Logging with Rotation

// tJava - Custom file logger with rotation
 
public class ETLFileLogger {
    private java.io.PrintWriter writer;
    private String logDir;
    private String jobName;
    private String currentLogFile;
    private long maxFileSize = 100 * 1024 * 1024; // 100MB
    private int maxBackups = 10;
 
    public ETLFileLogger(String logDir, String jobName) throws Exception {
        this.logDir = logDir;
        this.jobName = jobName;
        this.currentLogFile = logDir + "/" + jobName + ".log";
        rotateIfNeeded();
        this.writer = new java.io.PrintWriter(
            new java.io.FileWriter(currentLogFile, true));
    }
 
    public synchronized void log(String level, String message) {
        String timestamp = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
            .format(new java.util.Date());
        String logLine = String.format("%s [%s] %s - %s%n",
            timestamp, Thread.currentThread().getName(), level, message);
 
        writer.print(logLine);
        writer.flush();
 
        rotateIfNeeded();
    }
 
    public void info(String message) { log("INFO", message); }
    public void warn(String message) { log("WARN", message); }
    public void error(String message) { log("ERROR", message); }
 
    public void error(String message, Throwable t) {
        log("ERROR", message);
        java.io.StringWriter sw = new java.io.StringWriter();
        t.printStackTrace(new java.io.PrintWriter(sw));
        log("ERROR", sw.toString());
    }
 
    private void rotateIfNeeded() {
        try {
            java.io.File logFile = new java.io.File(currentLogFile);
            if (logFile.exists() && logFile.length() > maxFileSize) {
                if (writer != null) writer.close();
 
                // Rotate files
                for (int i = maxBackups - 1; i >= 0; i--) {
                    java.io.File src = new java.io.File(
                        currentLogFile + (i == 0 ? "" : "." + i));
                    java.io.File dest = new java.io.File(
                        currentLogFile + "." + (i + 1));
                    if (src.exists()) {
                        if (i == maxBackups - 1) src.delete();
                        else src.renameTo(dest);
                    }
                }
 
                this.writer = new java.io.PrintWriter(
                    new java.io.FileWriter(currentLogFile, false));
            }
        } catch (Exception e) {
            System.err.println("Log rotation failed: " + e.getMessage());
        }
    }
 
    public void close() {
        if (writer != null) writer.close();
    }
}
 
// Initialize in tJava (Start):
ETLFileLogger logger = new ETLFileLogger(context.log_directory, jobName);
globalMap.put("etl_logger", logger);
logger.info("Job started - Run ID: " + globalMap.get("run_id"));
 
// Use throughout job:
((ETLFileLogger) globalMap.get("etl_logger")).info("Processing batch 1");
 
// Close in tJava (End):
((ETLFileLogger) globalMap.get("etl_logger")).close();

Alerting Integration

Email Alerts

/*
 * tSendMail Configuration for Error Alerts:
 *
 * SMTP Settings:
 * - Host: smtp.company.com
 * - Port: 587
 * - TLS: true
 * - Username: context.smtp_user
 * - Password: context.smtp_password
 *
 * Email Settings:
 * - From: talend-alerts@company.com
 * - To: data-team@company.com
 * - Subject: "[ALERT] ETL Job Failed: " + jobName
 * - Message: Build from error details
 */
 
// tJava - Build error email content
StringBuilder emailBody = new StringBuilder();
emailBody.append("ETL Job Failure Alert\n");
emailBody.append("====================\n\n");
emailBody.append("Job Name: ").append(jobName).append("\n");
emailBody.append("Project: ").append(projectName).append("\n");
emailBody.append("Run ID: ").append(globalMap.get("run_id")).append("\n");
emailBody.append("Timestamp: ").append(new java.util.Date()).append("\n");
emailBody.append("Environment: ").append(context.environment).append("\n\n");
 
emailBody.append("Error Details:\n");
emailBody.append("--------------\n");
emailBody.append("Component: ").append(globalMap.get("error_component")).append("\n");
emailBody.append("Message: ").append(globalMap.get("error_message")).append("\n\n");
 
if (globalMap.get("error_stack_trace") != null) {
    emailBody.append("Stack Trace:\n");
    emailBody.append(globalMap.get("error_stack_trace")).append("\n\n");
}
 
emailBody.append("Statistics:\n");
emailBody.append("-----------\n");
emailBody.append("Records Processed: ").append(globalMap.get("records_processed")).append("\n");
emailBody.append("Records Failed: ").append(globalMap.get("records_failed")).append("\n");
 
globalMap.put("email_body", emailBody.toString());

Slack Integration

// tJava - Send Slack notification
 
import java.net.HttpURLConnection;
import java.net.URL;
 
public static void sendSlackAlert(String webhookUrl, String message,
        String channel, String severity) throws Exception {
 
    // Build Slack message payload
    String color = severity.equals("ERROR") ? "danger" :
                   severity.equals("WARNING") ? "warning" : "good";
 
    String payload = String.format(
        "{\"channel\": \"%s\", \"attachments\": [{" +
        "\"color\": \"%s\", " +
        "\"title\": \"Talend ETL Alert\", " +
        "\"text\": \"%s\", " +
        "\"ts\": %d" +
        "}]}",
        channel, color, escapeJson(message), System.currentTimeMillis() / 1000
    );
 
    URL url = new URL(webhookUrl);
    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    conn.setRequestMethod("POST");
    conn.setRequestProperty("Content-Type", "application/json");
    conn.setDoOutput(true);
 
    java.io.OutputStream os = conn.getOutputStream();
    os.write(payload.getBytes("UTF-8"));
    os.flush();
    os.close();
 
    int responseCode = conn.getResponseCode();
    if (responseCode != 200) {
        throw new RuntimeException("Slack notification failed: " + responseCode);
    }
}
 
private static String escapeJson(String text) {
    return text.replace("\\", "\\\\")
               .replace("\"", "\\\"")
               .replace("\n", "\\n")
               .replace("\r", "\\r")
               .replace("\t", "\\t");
}
 
// Usage in error handler:
try {
    sendSlackAlert(
        context.slack_webhook_url,
        "Job: " + jobName + " failed\\nError: " + globalMap.get("error_message"),
        "#data-alerts",
        "ERROR"
    );
} catch (Exception e) {
    System.err.println("Failed to send Slack alert: " + e.getMessage());
}

PagerDuty Integration

// tJava - Trigger PagerDuty incident for critical failures
 
public static void triggerPagerDuty(String routingKey, String summary,
        String source, String severity, java.util.Map<String, Object> details)
        throws Exception {
 
    org.json.JSONObject payload = new org.json.JSONObject();
    payload.put("routing_key", routingKey);
    payload.put("event_action", "trigger");
 
    org.json.JSONObject eventPayload = new org.json.JSONObject();
    eventPayload.put("summary", summary);
    eventPayload.put("source", source);
    eventPayload.put("severity", severity); // critical, error, warning, info
 
    org.json.JSONObject customDetails = new org.json.JSONObject(details);
    eventPayload.put("custom_details", customDetails);
 
    payload.put("payload", eventPayload);
 
    java.net.URL url = new java.net.URL(
        "https://events.pagerduty.com/v2/enqueue");
    java.net.HttpURLConnection conn =
        (java.net.HttpURLConnection) url.openConnection();
    conn.setRequestMethod("POST");
    conn.setRequestProperty("Content-Type", "application/json");
    conn.setDoOutput(true);
 
    java.io.OutputStream os = conn.getOutputStream();
    os.write(payload.toString().getBytes("UTF-8"));
    os.flush();
    os.close();
 
    int responseCode = conn.getResponseCode();
    if (responseCode != 202) {
        java.io.BufferedReader reader = new java.io.BufferedReader(
            new java.io.InputStreamReader(conn.getErrorStream()));
        String line;
        StringBuilder response = new StringBuilder();
        while ((line = reader.readLine()) != null) {
            response.append(line);
        }
        throw new RuntimeException("PagerDuty failed: " + response.toString());
    }
 
    System.out.println("PagerDuty incident triggered");
}
 
// Usage for critical failures:
if (context.environment.equals("PRODUCTION")) {
    java.util.Map<String, Object> details = new java.util.HashMap<>();
    details.put("job_name", jobName);
    details.put("error_message", globalMap.get("error_message"));
    details.put("run_id", globalMap.get("run_id"));
 
    triggerPagerDuty(
        context.pagerduty_routing_key,
        "Critical ETL Failure: " + jobName,
        "talend-etl-" + context.environment.toLowerCase(),
        "critical",
        details
    );
}

Error Recovery Patterns

Retry Logic

// tLoop with retry logic for transient failures
 
// tJava_Init - Initialize retry variables
int maxRetries = 3;
int currentRetry = 0;
int retryDelayMs = 1000; // Start with 1 second
boolean success = false;
 
globalMap.put("maxRetries", maxRetries);
globalMap.put("currentRetry", currentRetry);
globalMap.put("retryDelayMs", retryDelayMs);
globalMap.put("success", success);
 
// tLoop condition:
// !((Boolean) globalMap.get("success")) && ((Integer) globalMap.get("currentRetry")) < ((Integer) globalMap.get("maxRetries"))
 
// tJava - Retry wrapper (OnSubjobError of main processing)
int currentRetry = (Integer) globalMap.get("currentRetry");
int maxRetries = (Integer) globalMap.get("maxRetries");
int retryDelayMs = (Integer) globalMap.get("retryDelayMs");
 
String errorMessage = (String) globalMap.get("error_message");
 
// Check if error is retryable
boolean isRetryable = isRetryableError(errorMessage);
 
if (isRetryable && currentRetry < maxRetries - 1) {
    currentRetry++;
    globalMap.put("currentRetry", currentRetry);
 
    System.out.println("Retry " + currentRetry + "/" + maxRetries +
        " after " + retryDelayMs + "ms - Error: " + errorMessage);
 
    // Exponential backoff
    Thread.sleep(retryDelayMs);
    globalMap.put("retryDelayMs", retryDelayMs * 2);
 
} else {
    // Max retries exceeded or non-retryable error
    System.err.println("Failed after " + (currentRetry + 1) + " attempts");
    throw new RuntimeException("Job failed: " + errorMessage);
}
 
// Helper method
private static boolean isRetryableError(String errorMessage) {
    if (errorMessage == null) return false;
    String lower = errorMessage.toLowerCase();
    return lower.contains("connection") ||
           lower.contains("timeout") ||
           lower.contains("deadlock") ||
           lower.contains("lock wait") ||
           lower.contains("temporarily unavailable");
}
 
// tJava - Mark success on successful completion
globalMap.put("success", true);

Checkpoint and Resume

/*
 * Checkpoint pattern for long-running jobs:
 *
 * 1. Save progress to checkpoint table
 * 2. On failure, read last checkpoint
 * 3. Resume from checkpoint
 */
 
// Checkpoint table structure:
/*
 * CREATE TABLE etl_checkpoints (
 *     job_name VARCHAR(100),
 *     run_id VARCHAR(50),
 *     checkpoint_key VARCHAR(200),
 *     checkpoint_value VARCHAR(500),
 *     created_at DATETIME DEFAULT GETDATE(),
 *     PRIMARY KEY (job_name, run_id, checkpoint_key)
 * );
 */
 
// tJava - Save checkpoint
public static void saveCheckpoint(String jobName, String runId,
        String key, String value, java.sql.Connection conn) throws Exception {
 
    String sql = "MERGE INTO etl_checkpoints AS target " +
        "USING (SELECT ? as job_name, ? as run_id, ? as checkpoint_key, ? as checkpoint_value) AS source " +
        "ON target.job_name = source.job_name AND target.run_id = source.run_id AND target.checkpoint_key = source.checkpoint_key " +
        "WHEN MATCHED THEN UPDATE SET checkpoint_value = source.checkpoint_value, created_at = GETDATE() " +
        "WHEN NOT MATCHED THEN INSERT (job_name, run_id, checkpoint_key, checkpoint_value) " +
        "VALUES (source.job_name, source.run_id, source.checkpoint_key, source.checkpoint_value)";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, key);
        ps.setString(4, value);
        ps.executeUpdate();
    }
}
 
// tJava - Read checkpoint
public static String getCheckpoint(String jobName, String runId,
        String key, java.sql.Connection conn) throws Exception {
 
    String sql = "SELECT checkpoint_value FROM etl_checkpoints " +
        "WHERE job_name = ? AND run_id = ? AND checkpoint_key = ?";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, key);
        try (java.sql.ResultSet rs = ps.executeQuery()) {
            if (rs.next()) {
                return rs.getString("checkpoint_value");
            }
        }
    }
    return null;
}
 
// Usage in batch processing:
String lastProcessedId = getCheckpoint(jobName, runId, "last_processed_id", conn);
if (lastProcessedId != null) {
    System.out.println("Resuming from checkpoint: " + lastProcessedId);
    globalMap.put("resume_from_id", lastProcessedId);
} else {
    System.out.println("Starting fresh run");
    globalMap.put("resume_from_id", "0");
}
 
// After each batch:
saveCheckpoint(jobName, runId, "last_processed_id", currentBatchLastId, conn);

Best Practices Summary

error_handling_best_practices:
  design:
    - "Always handle OnSubjobError for main processing flows"
    - "Use row-level reject flows for data quality issues"
    - "Implement retry logic for transient failures"
    - "Set appropriate Die on error based on component criticality"
 
  logging:
    - "Use structured logging with consistent format"
    - "Include run_id for traceability across components"
    - "Log at appropriate levels (INFO, WARN, ERROR)"
    - "Implement log rotation for file-based logging"
    - "Store critical logs in database for querying"
 
  alerting:
    - "Alert on job failures immediately"
    - "Use different channels for different severities"
    - "Include actionable information in alerts"
    - "Don't over-alert - avoid alert fatigue"
 
  recovery:
    - "Implement checkpoint/resume for long jobs"
    - "Use exponential backoff for retries"
    - "Distinguish retryable vs non-retryable errors"
    - "Test recovery procedures regularly"
 
  database:
    - "Use transactions for multi-step operations"
    - "Implement proper rollback on failure"
    - "Log all database errors with context"
    - "Handle deadlocks with retry logic"

Conclusion

Robust error handling transforms Talend jobs from fragile scripts into production-grade data pipelines. Implement component-level handling, structured logging, and multi-channel alerting. Use retry patterns for resilience and checkpointing for recoverability. These practices ensure reliable data processing even in challenging conditions.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.