Talend

Talend gestionare erori si logging: bune practici

Petru Constantin
--15 min lectura
#talend#error-handling#logging#etl#best-practices

Gestionarea robusta a erorilor separa ETL-ul de nivel productie de pipeline-urile de date fragile. Acest ghid acopera pattern-uri complete pentru managementul exceptiilor, logging si construirea de job-uri Talend tolerante la erori.

Arhitectura de Gestionare a Erorilor

┌─────────────────────────────────────────────────────────────────────────────┐
│                    TALEND ERROR HANDLING ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   JOB EXECUTION FLOW                                                        │
│  ┌────────────────────────────────────────────────────────────────────────┐│
│  │                                                                        ││
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐               ││
│  │  │   Source    │───►│ Transform   │───►│   Target    │               ││
│  │  │   Input     │    │   (tMap)    │    │   Output    │               ││
│  │  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘               ││
│  │         │                  │                  │                       ││
│  │         │ onComponentError │ Row Reject       │ onSubjobError         ││
│  │         ▼                  ▼                  ▼                       ││
│  │  ┌──────────────────────────────────────────────────────────────┐    ││
│  │  │                    ERROR HANDLING LAYER                       │    ││
│  │  │                                                               │    ││
│  │  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │    ││
│  │  │  │   Log       │  │   Alert     │  │   Reject    │          │    ││
│  │  │  │   Error     │  │   (Email/   │  │   Records   │          │    ││
│  │  │  │   Details   │  │   Slack)    │  │   to File   │          │    ││
│  │  │  └─────────────┘  └─────────────┘  └─────────────┘          │    ││
│  │  │                                                               │    ││
│  │  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │    ││
│  │  │  │   Update    │  │   Retry     │  │   Rollback  │          │    ││
│  │  │  │   Status    │  │   Logic     │  │   Changes   │          │    ││
│  │  │  │   Table     │  │             │  │             │          │    ││
│  │  │  └─────────────┘  └─────────────┘  └─────────────┘          │    ││
│  │  │                                                               │    ││
│  │  └──────────────────────────────────────────────────────────────┘    ││
│  │                                                                        ││
│  └────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘

Gestionarea Erorilor la Nivel de Componenta

Setarea Die on Error

/*
 * Comportamentul componentei la eroare:
 *
 * Die on error: true (implicit)
 * - Job-ul se opreste imediat la eroare
 * - Foloseste pentru componentele critice unde esecul inseamna ca job-ul trebuie oprit
 *
 * Die on error: false
 * - Job-ul continua in ciuda erorii
 * - Foloseste cand vrei sa gestionezi erorile elegant
 * - Conecteaza trigger-ul OnComponentError pentru gestionare personalizata
 */
 
// Exemplu: tDBInput cu gestionare erori
/*
 * tDBInput_1 Settings:
 * - Die on error: false
 *
 * Connect: OnComponentError → tLogRow (detalii eroare)
 * Connect: OnSubjobOk → Continua procesarea
 */
 
// Accesarea detaliilor erorii in fluxul de gestionare:
// globalMap.get("tDBInput_1_ERROR_MESSAGE")
// globalMap.get("tDBInput_1_ERRORCODE")

OnSubjobError vs OnComponentError

/*
 * OnSubjobError:
 * - Se declanseaza dupa ce intregul subjob esueaza
 * - Foloseste pentru cleanup, logging, alerte
 * - Se declanseaza o singura data per esec de subjob
 *
 * OnComponentError:
 * - Se declanseaza cand o componenta specifica esueaza
 * - Control mai granular
 * - Se declanseaza pentru fiecare eroare de componenta
 */
 
// Structura job cu gestionare erori:
/*
 * [Main Processing]
 *     │
 *     ├── OnSubjobOk → [Success Flow]
 *     │                    │
 *     │                    └── tJava: logheaza succes, actualizeaza status
 *     │
 *     └── OnSubjobError → [Error Flow]
 *                            │
 *                            ├── tJava: logheaza detalii eroare
 *                            ├── tSendMail: alerteaza echipa
 *                            └── tDBOutput: logheaza in tabela de erori
 */
 
// tJava in fluxul de eroare - captureaza detalii eroare
String errorMessage = "";
String errorComponent = "";
 
// Verifica fiecare componenta pentru erori
if (globalMap.get("tDBInput_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tDBInput_1_ERROR_MESSAGE");
    errorComponent = "tDBInput_1";
}
if (globalMap.get("tMap_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tMap_1_ERROR_MESSAGE");
    errorComponent = "tMap_1";
}
if (globalMap.get("tDBOutput_1_ERROR_MESSAGE") != null) {
    errorMessage = (String) globalMap.get("tDBOutput_1_ERROR_MESSAGE");
    errorComponent = "tDBOutput_1";
}
 
// Stocheaza pentru logging
globalMap.put("error_message", errorMessage);
globalMap.put("error_component", errorComponent);
globalMap.put("error_timestamp", new java.util.Date());
 
System.err.println("ERROR in " + errorComponent + ": " + errorMessage);

Gestionarea Erorilor la Nivel de Rand

/*
 * tMap Reject Flow:
 *
 * Main output → Inregistrari valide
 * Reject output → Inregistrari invalide (chei null, lookup-uri esuate, etc.)
 */
 
// Configurare tMap pentru gestionarea rejecturilor:
/*
 * Output: main_output
 * - Filter: row1.customer_id != null && row2 != null (lookup gasit)
 *
 * Output: reject_output
 * - Catch lookup inner join reject: true
 * - Catch expression filter reject: true
 */
 
// Schema fluxului de reject ar trebui sa includa detalii eroare:
/*
 * reject_output schema:
 * - original_record (String) - Date originale serializate
 * - reject_reason (String) - Motivul rejectarii
 * - reject_timestamp (Date) - Cand a fost rejectat
 * - source_file (String) - De unde provine
 */
 
// Expresie pentru reject_reason:
Var.reject_reason = row1.customer_id == null ? "Missing customer_id" :
                    row2 == null ? "Customer lookup failed" :
                    row1.amount == null ? "Missing amount" :
                    "Unknown rejection reason"

Pattern Try-Catch

Folosirea tJavaFlex pentru Try-Catch

// tJavaFlex - Impacheteaza procesarea in try-catch
 
// START CODE
int successCount = 0;
int errorCount = 0;
java.util.List<String> errorMessages = new java.util.ArrayList<>();
 
// MAIN CODE
try {
    // Proceseaza randul
    output_row.id = input_row.id;
    output_row.processed_value = processValue(input_row.raw_value);
    output_row.status = "SUCCESS";
    successCount++;
 
} catch (NumberFormatException nfe) {
    // Gestioneaza exceptia specifica
    output_row.id = input_row.id;
    output_row.processed_value = null;
    output_row.status = "ERROR";
    output_row.error_message = "Invalid number format: " + nfe.getMessage();
    errorCount++;
    errorMessages.add("Row " + input_row.id + ": " + nfe.getMessage());
 
} catch (Exception e) {
    // Gestioneaza exceptia generica
    output_row.id = input_row.id;
    output_row.processed_value = null;
    output_row.status = "ERROR";
    output_row.error_message = "Processing error: " + e.getMessage();
    errorCount++;
    errorMessages.add("Row " + input_row.id + ": " + e.getMessage());
 
    // Logheaza stack trace pentru debugging
    java.io.StringWriter sw = new java.io.StringWriter();
    e.printStackTrace(new java.io.PrintWriter(sw));
    System.err.println("Stack trace for row " + input_row.id + ":\n" + sw.toString());
}
 
// END CODE
System.out.println("Processing complete: " + successCount + " success, " + errorCount + " errors");
globalMap.put("success_count", successCount);
globalMap.put("error_count", errorCount);
globalMap.put("error_messages", errorMessages);
 
// Esueaza job-ul daca pragul de eroare este depasit
double errorRate = (double) errorCount / (successCount + errorCount);
if (errorRate > 0.05) { // prag 5%
    throw new RuntimeException("Error rate exceeded threshold: " +
        String.format("%.2f%%", errorRate * 100));
}

Gestionarea Tranzactiilor in Baza de Date

/*
 * Structura job-ului pentru managementul tranzactiilor:
 *
 * tDBConnection (AutoCommit: false)
 *     │
 *     ├── [Processing Flow]
 *     │       │
 *     │       ├── tDBInput
 *     │       ├── tMap
 *     │       └── tDBOutput (commit: 0 - fara auto commit)
 *     │
 *     ├── OnSubjobOk → tDBCommit
 *     │
 *     └── OnSubjobError → tDBRollback → Gestionare erori
 */
 
// Setari tDBConnection:
/*
 * Auto Commit: false
 * Connection pool: Use existing or create
 */
 
// Setari tDBOutput:
/*
 * Use existing connection: tDBConnection_1
 * Commit every: 0 (commit manual)
 */
 
// tJava - Control manual al tranzactiilor
java.sql.Connection conn =
    (java.sql.Connection) globalMap.get("conn_tDBConnection_1");
 
try {
    // Procesarea se intampla aici...
 
    // Commit la succes
    conn.commit();
    System.out.println("Transaction committed successfully");
 
} catch (Exception e) {
    // Rollback la eroare
    try {
        conn.rollback();
        System.err.println("Transaction rolled back due to: " + e.getMessage());
    } catch (java.sql.SQLException rollbackEx) {
        System.err.println("Rollback failed: " + rollbackEx.getMessage());
    }
    throw e; // Re-throw pentru a declansa gestionarea erorilor job-ului
}

Strategii de Logging

Logging Structurat

// tJava - Initializare framework logging
 
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.MDC;
 
// Configureaza log4j
String log4jConfig = context.log4j_config_path;
PropertyConfigurator.configure(log4jConfig);
 
Logger logger = Logger.getLogger("TalendETL." + jobName);
 
// Seteaza MDC pentru logging structurat
MDC.put("job_name", jobName);
MDC.put("job_id", (String) globalMap.get("JOB_ID"));
MDC.put("project", projectName);
MDC.put("run_id", java.util.UUID.randomUUID().toString());
MDC.put("environment", context.environment);
 
logger.info("Job started");
 
/*
 * log4j.properties exemplu:
 *
 * log4j.rootLogger=INFO, console, file
 *
 * log4j.appender.console=org.apache.log4j.ConsoleAppender
 * log4j.appender.console.layout=org.apache.log4j.PatternLayout
 * log4j.appender.console.layout.ConversionPattern=%d{ISO8601} [%X{job_name}] [%X{run_id}] %-5p %c - %m%n
 *
 * log4j.appender.file=org.apache.log4j.RollingFileAppender
 * log4j.appender.file.File=/var/log/talend/${job_name}.log
 * log4j.appender.file.MaxFileSize=100MB
 * log4j.appender.file.MaxBackupIndex=10
 * log4j.appender.file.layout=org.apache.log4j.PatternLayout
 * log4j.appender.file.layout.ConversionPattern=%d{ISO8601} [%X{job_name}] [%X{run_id}] %-5p %c - %m%n
 */

Logging in Baza de Date

// Structura tabelei de log
/*
 * CREATE TABLE etl_job_logs (
 *     log_id BIGINT IDENTITY PRIMARY KEY,
 *     job_name VARCHAR(100),
 *     run_id VARCHAR(50),
 *     log_level VARCHAR(20),
 *     component_name VARCHAR(100),
 *     message TEXT,
 *     record_count INT,
 *     error_details TEXT,
 *     stack_trace TEXT,
 *     context_data TEXT,
 *     created_at DATETIME DEFAULT GETDATE()
 * );
 *
 * CREATE INDEX idx_job_logs_job ON etl_job_logs(job_name, created_at);
 * CREATE INDEX idx_job_logs_run ON etl_job_logs(run_id);
 */
 
// tJavaRow - Logheaza in baza de date
public static void logToDatabase(String jobName, String runId, String level,
        String component, String message, Integer recordCount,
        String errorDetails, java.sql.Connection conn) throws Exception {
 
    String sql = "INSERT INTO etl_job_logs " +
        "(job_name, run_id, log_level, component_name, message, record_count, error_details) " +
        "VALUES (?, ?, ?, ?, ?, ?, ?)";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, level);
        ps.setString(4, component);
        ps.setString(5, message);
        if (recordCount != null) {
            ps.setInt(6, recordCount);
        } else {
            ps.setNull(6, java.sql.Types.INTEGER);
        }
        ps.setString(7, errorDetails);
        ps.executeUpdate();
    }
}
 
// Utilizare in job:
java.sql.Connection logConn =
    (java.sql.Connection) globalMap.get("conn_tDBConnection_Log");
String runId = (String) globalMap.get("run_id");
 
logToDatabase(jobName, runId, "INFO", "tDBInput_1",
    "Started reading from source", null, null, logConn);
 
// Dupa procesare:
int rowCount = ((Integer) globalMap.get("tDBInput_1_NB_LINE"));
logToDatabase(jobName, runId, "INFO", "tDBInput_1",
    "Completed reading", rowCount, null, logConn);

Logging in Fisiere cu Rotatie

// tJava - Logger personalizat in fisiere cu rotatie
 
public class ETLFileLogger {
    private java.io.PrintWriter writer;
    private String logDir;
    private String jobName;
    private String currentLogFile;
    private long maxFileSize = 100 * 1024 * 1024; // 100MB
    private int maxBackups = 10;
 
    public ETLFileLogger(String logDir, String jobName) throws Exception {
        this.logDir = logDir;
        this.jobName = jobName;
        this.currentLogFile = logDir + "/" + jobName + ".log";
        rotateIfNeeded();
        this.writer = new java.io.PrintWriter(
            new java.io.FileWriter(currentLogFile, true));
    }
 
    public synchronized void log(String level, String message) {
        String timestamp = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
            .format(new java.util.Date());
        String logLine = String.format("%s [%s] %s - %s%n",
            timestamp, Thread.currentThread().getName(), level, message);
 
        writer.print(logLine);
        writer.flush();
 
        rotateIfNeeded();
    }
 
    public void info(String message) { log("INFO", message); }
    public void warn(String message) { log("WARN", message); }
    public void error(String message) { log("ERROR", message); }
 
    public void error(String message, Throwable t) {
        log("ERROR", message);
        java.io.StringWriter sw = new java.io.StringWriter();
        t.printStackTrace(new java.io.PrintWriter(sw));
        log("ERROR", sw.toString());
    }
 
    private void rotateIfNeeded() {
        try {
            java.io.File logFile = new java.io.File(currentLogFile);
            if (logFile.exists() && logFile.length() > maxFileSize) {
                if (writer != null) writer.close();
 
                // Roteaza fisierele
                for (int i = maxBackups - 1; i >= 0; i--) {
                    java.io.File src = new java.io.File(
                        currentLogFile + (i == 0 ? "" : "." + i));
                    java.io.File dest = new java.io.File(
                        currentLogFile + "." + (i + 1));
                    if (src.exists()) {
                        if (i == maxBackups - 1) src.delete();
                        else src.renameTo(dest);
                    }
                }
 
                this.writer = new java.io.PrintWriter(
                    new java.io.FileWriter(currentLogFile, false));
            }
        } catch (Exception e) {
            System.err.println("Log rotation failed: " + e.getMessage());
        }
    }
 
    public void close() {
        if (writer != null) writer.close();
    }
}
 
// Initializeaza in tJava (Start):
ETLFileLogger logger = new ETLFileLogger(context.log_directory, jobName);
globalMap.put("etl_logger", logger);
logger.info("Job started - Run ID: " + globalMap.get("run_id"));
 
// Foloseste in tot job-ul:
((ETLFileLogger) globalMap.get("etl_logger")).info("Processing batch 1");
 
// Inchide in tJava (End):
((ETLFileLogger) globalMap.get("etl_logger")).close();

Integrare Alerte

Alerte pe Email

/*
 * Configurare tSendMail pentru alerte de eroare:
 *
 * SMTP Settings:
 * - Host: smtp.company.com
 * - Port: 587
 * - TLS: true
 * - Username: context.smtp_user
 * - Password: context.smtp_password
 *
 * Email Settings:
 * - From: talend-alerts@company.com
 * - To: data-team@company.com
 * - Subject: "[ALERT] ETL Job Failed: " + jobName
 * - Message: Construit din detaliile erorii
 */
 
// tJava - Construieste continutul email-ului de eroare
StringBuilder emailBody = new StringBuilder();
emailBody.append("ETL Job Failure Alert\n");
emailBody.append("====================\n\n");
emailBody.append("Job Name: ").append(jobName).append("\n");
emailBody.append("Project: ").append(projectName).append("\n");
emailBody.append("Run ID: ").append(globalMap.get("run_id")).append("\n");
emailBody.append("Timestamp: ").append(new java.util.Date()).append("\n");
emailBody.append("Environment: ").append(context.environment).append("\n\n");
 
emailBody.append("Error Details:\n");
emailBody.append("--------------\n");
emailBody.append("Component: ").append(globalMap.get("error_component")).append("\n");
emailBody.append("Message: ").append(globalMap.get("error_message")).append("\n\n");
 
if (globalMap.get("error_stack_trace") != null) {
    emailBody.append("Stack Trace:\n");
    emailBody.append(globalMap.get("error_stack_trace")).append("\n\n");
}
 
emailBody.append("Statistics:\n");
emailBody.append("-----------\n");
emailBody.append("Records Processed: ").append(globalMap.get("records_processed")).append("\n");
emailBody.append("Records Failed: ").append(globalMap.get("records_failed")).append("\n");
 
globalMap.put("email_body", emailBody.toString());

Integrare Slack

// tJava - Trimite notificare Slack
 
import java.net.HttpURLConnection;
import java.net.URL;
 
public static void sendSlackAlert(String webhookUrl, String message,
        String channel, String severity) throws Exception {
 
    // Construieste payload-ul mesajului Slack
    String color = severity.equals("ERROR") ? "danger" :
                   severity.equals("WARNING") ? "warning" : "good";
 
    String payload = String.format(
        "{\"channel\": \"%s\", \"attachments\": [{" +
        "\"color\": \"%s\", " +
        "\"title\": \"Talend ETL Alert\", " +
        "\"text\": \"%s\", " +
        "\"ts\": %d" +
        "}]}",
        channel, color, escapeJson(message), System.currentTimeMillis() / 1000
    );
 
    URL url = new URL(webhookUrl);
    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    conn.setRequestMethod("POST");
    conn.setRequestProperty("Content-Type", "application/json");
    conn.setDoOutput(true);
 
    java.io.OutputStream os = conn.getOutputStream();
    os.write(payload.getBytes("UTF-8"));
    os.flush();
    os.close();
 
    int responseCode = conn.getResponseCode();
    if (responseCode != 200) {
        throw new RuntimeException("Slack notification failed: " + responseCode);
    }
}
 
private static String escapeJson(String text) {
    return text.replace("\\", "\\\\")
               .replace("\"", "\\\"")
               .replace("\n", "\\n")
               .replace("\r", "\\r")
               .replace("\t", "\\t");
}
 
// Utilizare in handler-ul de eroare:
try {
    sendSlackAlert(
        context.slack_webhook_url,
        "Job: " + jobName + " failed\\nError: " + globalMap.get("error_message"),
        "#data-alerts",
        "ERROR"
    );
} catch (Exception e) {
    System.err.println("Failed to send Slack alert: " + e.getMessage());
}

Integrare PagerDuty

// tJava - Declanseaza incident PagerDuty pentru esecuri critice
 
public static void triggerPagerDuty(String routingKey, String summary,
        String source, String severity, java.util.Map<String, Object> details)
        throws Exception {
 
    org.json.JSONObject payload = new org.json.JSONObject();
    payload.put("routing_key", routingKey);
    payload.put("event_action", "trigger");
 
    org.json.JSONObject eventPayload = new org.json.JSONObject();
    eventPayload.put("summary", summary);
    eventPayload.put("source", source);
    eventPayload.put("severity", severity); // critical, error, warning, info
 
    org.json.JSONObject customDetails = new org.json.JSONObject(details);
    eventPayload.put("custom_details", customDetails);
 
    payload.put("payload", eventPayload);
 
    java.net.URL url = new java.net.URL(
        "https://events.pagerduty.com/v2/enqueue");
    java.net.HttpURLConnection conn =
        (java.net.HttpURLConnection) url.openConnection();
    conn.setRequestMethod("POST");
    conn.setRequestProperty("Content-Type", "application/json");
    conn.setDoOutput(true);
 
    java.io.OutputStream os = conn.getOutputStream();
    os.write(payload.toString().getBytes("UTF-8"));
    os.flush();
    os.close();
 
    int responseCode = conn.getResponseCode();
    if (responseCode != 202) {
        java.io.BufferedReader reader = new java.io.BufferedReader(
            new java.io.InputStreamReader(conn.getErrorStream()));
        String line;
        StringBuilder response = new StringBuilder();
        while ((line = reader.readLine()) != null) {
            response.append(line);
        }
        throw new RuntimeException("PagerDuty failed: " + response.toString());
    }
 
    System.out.println("PagerDuty incident triggered");
}
 
// Utilizare pentru esecuri critice:
if (context.environment.equals("PRODUCTION")) {
    java.util.Map<String, Object> details = new java.util.HashMap<>();
    details.put("job_name", jobName);
    details.put("error_message", globalMap.get("error_message"));
    details.put("run_id", globalMap.get("run_id"));
 
    triggerPagerDuty(
        context.pagerduty_routing_key,
        "Critical ETL Failure: " + jobName,
        "talend-etl-" + context.environment.toLowerCase(),
        "critical",
        details
    );
}

Pattern-uri de Recuperare din Erori

Logica de Retry

// tLoop cu logica de retry pentru esecuri tranziente
 
// tJava_Init - Initializeaza variabilele de retry
int maxRetries = 3;
int currentRetry = 0;
int retryDelayMs = 1000; // Incepe cu 1 secunda
boolean success = false;
 
globalMap.put("maxRetries", maxRetries);
globalMap.put("currentRetry", currentRetry);
globalMap.put("retryDelayMs", retryDelayMs);
globalMap.put("success", success);
 
// tLoop condition:
// !((Boolean) globalMap.get("success")) && ((Integer) globalMap.get("currentRetry")) < ((Integer) globalMap.get("maxRetries"))
 
// tJava - Retry wrapper (OnSubjobError din procesarea principala)
int currentRetry = (Integer) globalMap.get("currentRetry");
int maxRetries = (Integer) globalMap.get("maxRetries");
int retryDelayMs = (Integer) globalMap.get("retryDelayMs");
 
String errorMessage = (String) globalMap.get("error_message");
 
// Verifica daca eroarea este retryable
boolean isRetryable = isRetryableError(errorMessage);
 
if (isRetryable && currentRetry < maxRetries - 1) {
    currentRetry++;
    globalMap.put("currentRetry", currentRetry);
 
    System.out.println("Retry " + currentRetry + "/" + maxRetries +
        " after " + retryDelayMs + "ms - Error: " + errorMessage);
 
    // Exponential backoff
    Thread.sleep(retryDelayMs);
    globalMap.put("retryDelayMs", retryDelayMs * 2);
 
} else {
    // Numarul maxim de retry-uri depasit sau eroare non-retryable
    System.err.println("Failed after " + (currentRetry + 1) + " attempts");
    throw new RuntimeException("Job failed: " + errorMessage);
}
 
// Metoda helper
private static boolean isRetryableError(String errorMessage) {
    if (errorMessage == null) return false;
    String lower = errorMessage.toLowerCase();
    return lower.contains("connection") ||
           lower.contains("timeout") ||
           lower.contains("deadlock") ||
           lower.contains("lock wait") ||
           lower.contains("temporarily unavailable");
}
 
// tJava - Marcheaza succes la finalizare reusita
globalMap.put("success", true);

Checkpoint si Reluare

/*
 * Pattern checkpoint pentru job-uri de lunga durata:
 *
 * 1. Salveaza progresul in tabela de checkpoint
 * 2. La esec, citeste ultimul checkpoint
 * 3. Reia de la checkpoint
 */
 
// Structura tabelei de checkpoint:
/*
 * CREATE TABLE etl_checkpoints (
 *     job_name VARCHAR(100),
 *     run_id VARCHAR(50),
 *     checkpoint_key VARCHAR(200),
 *     checkpoint_value VARCHAR(500),
 *     created_at DATETIME DEFAULT GETDATE(),
 *     PRIMARY KEY (job_name, run_id, checkpoint_key)
 * );
 */
 
// tJava - Salveaza checkpoint
public static void saveCheckpoint(String jobName, String runId,
        String key, String value, java.sql.Connection conn) throws Exception {
 
    String sql = "MERGE INTO etl_checkpoints AS target " +
        "USING (SELECT ? as job_name, ? as run_id, ? as checkpoint_key, ? as checkpoint_value) AS source " +
        "ON target.job_name = source.job_name AND target.run_id = source.run_id AND target.checkpoint_key = source.checkpoint_key " +
        "WHEN MATCHED THEN UPDATE SET checkpoint_value = source.checkpoint_value, created_at = GETDATE() " +
        "WHEN NOT MATCHED THEN INSERT (job_name, run_id, checkpoint_key, checkpoint_value) " +
        "VALUES (source.job_name, source.run_id, source.checkpoint_key, source.checkpoint_value)";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, key);
        ps.setString(4, value);
        ps.executeUpdate();
    }
}
 
// tJava - Citeste checkpoint
public static String getCheckpoint(String jobName, String runId,
        String key, java.sql.Connection conn) throws Exception {
 
    String sql = "SELECT checkpoint_value FROM etl_checkpoints " +
        "WHERE job_name = ? AND run_id = ? AND checkpoint_key = ?";
 
    try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setString(1, jobName);
        ps.setString(2, runId);
        ps.setString(3, key);
        try (java.sql.ResultSet rs = ps.executeQuery()) {
            if (rs.next()) {
                return rs.getString("checkpoint_value");
            }
        }
    }
    return null;
}
 
// Utilizare in procesarea pe batch-uri:
String lastProcessedId = getCheckpoint(jobName, runId, "last_processed_id", conn);
if (lastProcessedId != null) {
    System.out.println("Resuming from checkpoint: " + lastProcessedId);
    globalMap.put("resume_from_id", lastProcessedId);
} else {
    System.out.println("Starting fresh run");
    globalMap.put("resume_from_id", "0");
}
 
// Dupa fiecare batch:
saveCheckpoint(jobName, runId, "last_processed_id", currentBatchLastId, conn);

Sumar Bune Practici

error_handling_best_practices:
  design:
    - "Gestioneaza intotdeauna OnSubjobError pentru fluxurile principale de procesare"
    - "Foloseste fluxuri de reject la nivel de rand pentru probleme de calitate a datelor"
    - "Implementeaza logica de retry pentru esecuri tranziente"
    - "Seteaza Die on error corespunzator in functie de criticitatea componentei"
 
  logging:
    - "Foloseste logging structurat cu format consistent"
    - "Include run_id pentru trasabilitate intre componente"
    - "Logheaza la niveluri corespunzatoare (INFO, WARN, ERROR)"
    - "Implementeaza rotatie log pentru logging in fisiere"
    - "Stocheaza log-urile critice in baza de date pentru interogare"
 
  alerting:
    - "Alerteaza imediat la esecuri de job"
    - "Foloseste canale diferite pentru severitati diferite"
    - "Include informatii actionabile in alerte"
    - "Nu supra-alerta - evita fatigarea alertelor"
 
  recovery:
    - "Implementeaza checkpoint/reluare pentru job-uri lungi"
    - "Foloseste exponential backoff pentru retry-uri"
    - "Diferentiaza erorile retryable de cele non-retryable"
    - "Testeaza procedurile de recuperare regulat"
 
  database:
    - "Foloseste tranzactii pentru operatii cu pasi multipli"
    - "Implementeaza rollback corect la esec"
    - "Logheaza toate erorile de baza de date cu context"
    - "Gestioneaza deadlock-urile cu logica de retry"

Concluzie

Gestionarea robusta a erorilor transforma job-urile Talend din scripturi fragile in pipeline-uri de date de nivel productie. Implementeaza gestionarea la nivel de componenta, logging structurat si alerte pe canale multiple. Foloseste pattern-uri de retry pentru rezilienta si checkpoint-uri pentru capacitatea de recuperare. Aceste practici asigura procesarea fiabila a datelor chiar si in conditii dificile.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.