Gestionarea robusta a erorilor separa ETL-ul de nivel productie de pipeline-urile de date fragile. Acest ghid acopera pattern-uri complete pentru managementul exceptiilor, logging si construirea de job-uri Talend tolerante la erori.
Arhitectura de Gestionare a Erorilor
┌─────────────────────────────────────────────────────────────────────────────┐
│ TALEND ERROR HANDLING ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ JOB EXECUTION FLOW │
│ ┌────────────────────────────────────────────────────────────────────────┐│
│ │ ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Source │───►│ Transform │───►│ Target │ ││
│ │ │ Input │ │ (tMap) │ │ Output │ ││
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ ││
│ │ │ │ │ ││
│ │ │ onComponentError │ Row Reject │ onSubjobError ││
│ │ ▼ ▼ ▼ ││
│ │ ┌──────────────────────────────────────────────────────────────┐ ││
│ │ │ ERROR HANDLING LAYER │ ││
│ │ │ │ ││
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ ││
│ │ │ │ Log │ │ Alert │ │ Reject │ │ ││
│ │ │ │ Error │ │ (Email/ │ │ Records │ │ ││
│ │ │ │ Details │ │ Slack) │ │ to File │ │ ││
│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ ││
│ │ │ │ ││
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ ││
│ │ │ │ Update │ │ Retry │ │ Rollback │ │ ││
│ │ │ │ Status │ │ Logic │ │ Changes │ │ ││
│ │ │ │ Table │ │ │ │ │ │ ││
│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ ││
│ │ │ │ ││
│ │ └──────────────────────────────────────────────────────────────┘ ││
│ │ ││
│ └────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘
Gestionarea Erorilor la Nivel de Componenta
Setarea Die on Error
/*
* Comportamentul componentei la eroare:
*
* Die on error: true (implicit)
* - Job-ul se opreste imediat la eroare
* - Foloseste pentru componentele critice unde esecul inseamna ca job-ul trebuie oprit
*
* Die on error: false
* - Job-ul continua in ciuda erorii
* - Foloseste cand vrei sa gestionezi erorile elegant
* - Conecteaza trigger-ul OnComponentError pentru gestionare personalizata
*/
// Exemplu: tDBInput cu gestionare erori
/*
* tDBInput_1 Settings:
* - Die on error: false
*
* Connect: OnComponentError → tLogRow (detalii eroare)
* Connect: OnSubjobOk → Continua procesarea
*/
// Accesarea detaliilor erorii in fluxul de gestionare:
// globalMap.get("tDBInput_1_ERROR_MESSAGE")
// globalMap.get("tDBInput_1_ERRORCODE")OnSubjobError vs OnComponentError
/*
* OnSubjobError:
* - Se declanseaza dupa ce intregul subjob esueaza
* - Foloseste pentru cleanup, logging, alerte
* - Se declanseaza o singura data per esec de subjob
*
* OnComponentError:
* - Se declanseaza cand o componenta specifica esueaza
* - Control mai granular
* - Se declanseaza pentru fiecare eroare de componenta
*/
// Structura job cu gestionare erori:
/*
* [Main Processing]
* │
* ├── OnSubjobOk → [Success Flow]
* │ │
* │ └── tJava: logheaza succes, actualizeaza status
* │
* └── OnSubjobError → [Error Flow]
* │
* ├── tJava: logheaza detalii eroare
* ├── tSendMail: alerteaza echipa
* └── tDBOutput: logheaza in tabela de erori
*/
// tJava in fluxul de eroare - captureaza detalii eroare
String errorMessage = "";
String errorComponent = "";
// Verifica fiecare componenta pentru erori
if (globalMap.get("tDBInput_1_ERROR_MESSAGE") != null) {
errorMessage = (String) globalMap.get("tDBInput_1_ERROR_MESSAGE");
errorComponent = "tDBInput_1";
}
if (globalMap.get("tMap_1_ERROR_MESSAGE") != null) {
errorMessage = (String) globalMap.get("tMap_1_ERROR_MESSAGE");
errorComponent = "tMap_1";
}
if (globalMap.get("tDBOutput_1_ERROR_MESSAGE") != null) {
errorMessage = (String) globalMap.get("tDBOutput_1_ERROR_MESSAGE");
errorComponent = "tDBOutput_1";
}
// Stocheaza pentru logging
globalMap.put("error_message", errorMessage);
globalMap.put("error_component", errorComponent);
globalMap.put("error_timestamp", new java.util.Date());
System.err.println("ERROR in " + errorComponent + ": " + errorMessage);Gestionarea Erorilor la Nivel de Rand
/*
* tMap Reject Flow:
*
* Main output → Inregistrari valide
* Reject output → Inregistrari invalide (chei null, lookup-uri esuate, etc.)
*/
// Configurare tMap pentru gestionarea rejecturilor:
/*
* Output: main_output
* - Filter: row1.customer_id != null && row2 != null (lookup gasit)
*
* Output: reject_output
* - Catch lookup inner join reject: true
* - Catch expression filter reject: true
*/
// Schema fluxului de reject ar trebui sa includa detalii eroare:
/*
* reject_output schema:
* - original_record (String) - Date originale serializate
* - reject_reason (String) - Motivul rejectarii
* - reject_timestamp (Date) - Cand a fost rejectat
* - source_file (String) - De unde provine
*/
// Expresie pentru reject_reason:
Var.reject_reason = row1.customer_id == null ? "Missing customer_id" :
row2 == null ? "Customer lookup failed" :
row1.amount == null ? "Missing amount" :
"Unknown rejection reason"Pattern Try-Catch
Folosirea tJavaFlex pentru Try-Catch
// tJavaFlex - Impacheteaza procesarea in try-catch
// START CODE
int successCount = 0;
int errorCount = 0;
java.util.List<String> errorMessages = new java.util.ArrayList<>();
// MAIN CODE
try {
// Proceseaza randul
output_row.id = input_row.id;
output_row.processed_value = processValue(input_row.raw_value);
output_row.status = "SUCCESS";
successCount++;
} catch (NumberFormatException nfe) {
// Gestioneaza exceptia specifica
output_row.id = input_row.id;
output_row.processed_value = null;
output_row.status = "ERROR";
output_row.error_message = "Invalid number format: " + nfe.getMessage();
errorCount++;
errorMessages.add("Row " + input_row.id + ": " + nfe.getMessage());
} catch (Exception e) {
// Gestioneaza exceptia generica
output_row.id = input_row.id;
output_row.processed_value = null;
output_row.status = "ERROR";
output_row.error_message = "Processing error: " + e.getMessage();
errorCount++;
errorMessages.add("Row " + input_row.id + ": " + e.getMessage());
// Logheaza stack trace pentru debugging
java.io.StringWriter sw = new java.io.StringWriter();
e.printStackTrace(new java.io.PrintWriter(sw));
System.err.println("Stack trace for row " + input_row.id + ":\n" + sw.toString());
}
// END CODE
System.out.println("Processing complete: " + successCount + " success, " + errorCount + " errors");
globalMap.put("success_count", successCount);
globalMap.put("error_count", errorCount);
globalMap.put("error_messages", errorMessages);
// Esueaza job-ul daca pragul de eroare este depasit
double errorRate = (double) errorCount / (successCount + errorCount);
if (errorRate > 0.05) { // prag 5%
throw new RuntimeException("Error rate exceeded threshold: " +
String.format("%.2f%%", errorRate * 100));
}Gestionarea Tranzactiilor in Baza de Date
/*
* Structura job-ului pentru managementul tranzactiilor:
*
* tDBConnection (AutoCommit: false)
* │
* ├── [Processing Flow]
* │ │
* │ ├── tDBInput
* │ ├── tMap
* │ └── tDBOutput (commit: 0 - fara auto commit)
* │
* ├── OnSubjobOk → tDBCommit
* │
* └── OnSubjobError → tDBRollback → Gestionare erori
*/
// Setari tDBConnection:
/*
* Auto Commit: false
* Connection pool: Use existing or create
*/
// Setari tDBOutput:
/*
* Use existing connection: tDBConnection_1
* Commit every: 0 (commit manual)
*/
// tJava - Control manual al tranzactiilor
java.sql.Connection conn =
(java.sql.Connection) globalMap.get("conn_tDBConnection_1");
try {
// Procesarea se intampla aici...
// Commit la succes
conn.commit();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
// Rollback la eroare
try {
conn.rollback();
System.err.println("Transaction rolled back due to: " + e.getMessage());
} catch (java.sql.SQLException rollbackEx) {
System.err.println("Rollback failed: " + rollbackEx.getMessage());
}
throw e; // Re-throw pentru a declansa gestionarea erorilor job-ului
}Strategii de Logging
Logging Structurat
// tJava - Initializare framework logging
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.MDC;
// Configureaza log4j
String log4jConfig = context.log4j_config_path;
PropertyConfigurator.configure(log4jConfig);
Logger logger = Logger.getLogger("TalendETL." + jobName);
// Seteaza MDC pentru logging structurat
MDC.put("job_name", jobName);
MDC.put("job_id", (String) globalMap.get("JOB_ID"));
MDC.put("project", projectName);
MDC.put("run_id", java.util.UUID.randomUUID().toString());
MDC.put("environment", context.environment);
logger.info("Job started");
/*
* log4j.properties exemplu:
*
* log4j.rootLogger=INFO, console, file
*
* log4j.appender.console=org.apache.log4j.ConsoleAppender
* log4j.appender.console.layout=org.apache.log4j.PatternLayout
* log4j.appender.console.layout.ConversionPattern=%d{ISO8601} [%X{job_name}] [%X{run_id}] %-5p %c - %m%n
*
* log4j.appender.file=org.apache.log4j.RollingFileAppender
* log4j.appender.file.File=/var/log/talend/${job_name}.log
* log4j.appender.file.MaxFileSize=100MB
* log4j.appender.file.MaxBackupIndex=10
* log4j.appender.file.layout=org.apache.log4j.PatternLayout
* log4j.appender.file.layout.ConversionPattern=%d{ISO8601} [%X{job_name}] [%X{run_id}] %-5p %c - %m%n
*/Logging in Baza de Date
// Structura tabelei de log
/*
* CREATE TABLE etl_job_logs (
* log_id BIGINT IDENTITY PRIMARY KEY,
* job_name VARCHAR(100),
* run_id VARCHAR(50),
* log_level VARCHAR(20),
* component_name VARCHAR(100),
* message TEXT,
* record_count INT,
* error_details TEXT,
* stack_trace TEXT,
* context_data TEXT,
* created_at DATETIME DEFAULT GETDATE()
* );
*
* CREATE INDEX idx_job_logs_job ON etl_job_logs(job_name, created_at);
* CREATE INDEX idx_job_logs_run ON etl_job_logs(run_id);
*/
// tJavaRow - Logheaza in baza de date
public static void logToDatabase(String jobName, String runId, String level,
String component, String message, Integer recordCount,
String errorDetails, java.sql.Connection conn) throws Exception {
String sql = "INSERT INTO etl_job_logs " +
"(job_name, run_id, log_level, component_name, message, record_count, error_details) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)";
try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, jobName);
ps.setString(2, runId);
ps.setString(3, level);
ps.setString(4, component);
ps.setString(5, message);
if (recordCount != null) {
ps.setInt(6, recordCount);
} else {
ps.setNull(6, java.sql.Types.INTEGER);
}
ps.setString(7, errorDetails);
ps.executeUpdate();
}
}
// Utilizare in job:
java.sql.Connection logConn =
(java.sql.Connection) globalMap.get("conn_tDBConnection_Log");
String runId = (String) globalMap.get("run_id");
logToDatabase(jobName, runId, "INFO", "tDBInput_1",
"Started reading from source", null, null, logConn);
// Dupa procesare:
int rowCount = ((Integer) globalMap.get("tDBInput_1_NB_LINE"));
logToDatabase(jobName, runId, "INFO", "tDBInput_1",
"Completed reading", rowCount, null, logConn);Logging in Fisiere cu Rotatie
// tJava - Logger personalizat in fisiere cu rotatie
public class ETLFileLogger {
private java.io.PrintWriter writer;
private String logDir;
private String jobName;
private String currentLogFile;
private long maxFileSize = 100 * 1024 * 1024; // 100MB
private int maxBackups = 10;
public ETLFileLogger(String logDir, String jobName) throws Exception {
this.logDir = logDir;
this.jobName = jobName;
this.currentLogFile = logDir + "/" + jobName + ".log";
rotateIfNeeded();
this.writer = new java.io.PrintWriter(
new java.io.FileWriter(currentLogFile, true));
}
public synchronized void log(String level, String message) {
String timestamp = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
.format(new java.util.Date());
String logLine = String.format("%s [%s] %s - %s%n",
timestamp, Thread.currentThread().getName(), level, message);
writer.print(logLine);
writer.flush();
rotateIfNeeded();
}
public void info(String message) { log("INFO", message); }
public void warn(String message) { log("WARN", message); }
public void error(String message) { log("ERROR", message); }
public void error(String message, Throwable t) {
log("ERROR", message);
java.io.StringWriter sw = new java.io.StringWriter();
t.printStackTrace(new java.io.PrintWriter(sw));
log("ERROR", sw.toString());
}
private void rotateIfNeeded() {
try {
java.io.File logFile = new java.io.File(currentLogFile);
if (logFile.exists() && logFile.length() > maxFileSize) {
if (writer != null) writer.close();
// Roteaza fisierele
for (int i = maxBackups - 1; i >= 0; i--) {
java.io.File src = new java.io.File(
currentLogFile + (i == 0 ? "" : "." + i));
java.io.File dest = new java.io.File(
currentLogFile + "." + (i + 1));
if (src.exists()) {
if (i == maxBackups - 1) src.delete();
else src.renameTo(dest);
}
}
this.writer = new java.io.PrintWriter(
new java.io.FileWriter(currentLogFile, false));
}
} catch (Exception e) {
System.err.println("Log rotation failed: " + e.getMessage());
}
}
public void close() {
if (writer != null) writer.close();
}
}
// Initializeaza in tJava (Start):
ETLFileLogger logger = new ETLFileLogger(context.log_directory, jobName);
globalMap.put("etl_logger", logger);
logger.info("Job started - Run ID: " + globalMap.get("run_id"));
// Foloseste in tot job-ul:
((ETLFileLogger) globalMap.get("etl_logger")).info("Processing batch 1");
// Inchide in tJava (End):
((ETLFileLogger) globalMap.get("etl_logger")).close();Integrare Alerte
Alerte pe Email
/*
* Configurare tSendMail pentru alerte de eroare:
*
* SMTP Settings:
* - Host: smtp.company.com
* - Port: 587
* - TLS: true
* - Username: context.smtp_user
* - Password: context.smtp_password
*
* Email Settings:
* - From: talend-alerts@company.com
* - To: data-team@company.com
* - Subject: "[ALERT] ETL Job Failed: " + jobName
* - Message: Construit din detaliile erorii
*/
// tJava - Construieste continutul email-ului de eroare
StringBuilder emailBody = new StringBuilder();
emailBody.append("ETL Job Failure Alert\n");
emailBody.append("====================\n\n");
emailBody.append("Job Name: ").append(jobName).append("\n");
emailBody.append("Project: ").append(projectName).append("\n");
emailBody.append("Run ID: ").append(globalMap.get("run_id")).append("\n");
emailBody.append("Timestamp: ").append(new java.util.Date()).append("\n");
emailBody.append("Environment: ").append(context.environment).append("\n\n");
emailBody.append("Error Details:\n");
emailBody.append("--------------\n");
emailBody.append("Component: ").append(globalMap.get("error_component")).append("\n");
emailBody.append("Message: ").append(globalMap.get("error_message")).append("\n\n");
if (globalMap.get("error_stack_trace") != null) {
emailBody.append("Stack Trace:\n");
emailBody.append(globalMap.get("error_stack_trace")).append("\n\n");
}
emailBody.append("Statistics:\n");
emailBody.append("-----------\n");
emailBody.append("Records Processed: ").append(globalMap.get("records_processed")).append("\n");
emailBody.append("Records Failed: ").append(globalMap.get("records_failed")).append("\n");
globalMap.put("email_body", emailBody.toString());Integrare Slack
// tJava - Trimite notificare Slack
import java.net.HttpURLConnection;
import java.net.URL;
public static void sendSlackAlert(String webhookUrl, String message,
String channel, String severity) throws Exception {
// Construieste payload-ul mesajului Slack
String color = severity.equals("ERROR") ? "danger" :
severity.equals("WARNING") ? "warning" : "good";
String payload = String.format(
"{\"channel\": \"%s\", \"attachments\": [{" +
"\"color\": \"%s\", " +
"\"title\": \"Talend ETL Alert\", " +
"\"text\": \"%s\", " +
"\"ts\": %d" +
"}]}",
channel, color, escapeJson(message), System.currentTimeMillis() / 1000
);
URL url = new URL(webhookUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
java.io.OutputStream os = conn.getOutputStream();
os.write(payload.getBytes("UTF-8"));
os.flush();
os.close();
int responseCode = conn.getResponseCode();
if (responseCode != 200) {
throw new RuntimeException("Slack notification failed: " + responseCode);
}
}
private static String escapeJson(String text) {
return text.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
}
// Utilizare in handler-ul de eroare:
try {
sendSlackAlert(
context.slack_webhook_url,
"Job: " + jobName + " failed\\nError: " + globalMap.get("error_message"),
"#data-alerts",
"ERROR"
);
} catch (Exception e) {
System.err.println("Failed to send Slack alert: " + e.getMessage());
}Integrare PagerDuty
// tJava - Declanseaza incident PagerDuty pentru esecuri critice
public static void triggerPagerDuty(String routingKey, String summary,
String source, String severity, java.util.Map<String, Object> details)
throws Exception {
org.json.JSONObject payload = new org.json.JSONObject();
payload.put("routing_key", routingKey);
payload.put("event_action", "trigger");
org.json.JSONObject eventPayload = new org.json.JSONObject();
eventPayload.put("summary", summary);
eventPayload.put("source", source);
eventPayload.put("severity", severity); // critical, error, warning, info
org.json.JSONObject customDetails = new org.json.JSONObject(details);
eventPayload.put("custom_details", customDetails);
payload.put("payload", eventPayload);
java.net.URL url = new java.net.URL(
"https://events.pagerduty.com/v2/enqueue");
java.net.HttpURLConnection conn =
(java.net.HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
java.io.OutputStream os = conn.getOutputStream();
os.write(payload.toString().getBytes("UTF-8"));
os.flush();
os.close();
int responseCode = conn.getResponseCode();
if (responseCode != 202) {
java.io.BufferedReader reader = new java.io.BufferedReader(
new java.io.InputStreamReader(conn.getErrorStream()));
String line;
StringBuilder response = new StringBuilder();
while ((line = reader.readLine()) != null) {
response.append(line);
}
throw new RuntimeException("PagerDuty failed: " + response.toString());
}
System.out.println("PagerDuty incident triggered");
}
// Utilizare pentru esecuri critice:
if (context.environment.equals("PRODUCTION")) {
java.util.Map<String, Object> details = new java.util.HashMap<>();
details.put("job_name", jobName);
details.put("error_message", globalMap.get("error_message"));
details.put("run_id", globalMap.get("run_id"));
triggerPagerDuty(
context.pagerduty_routing_key,
"Critical ETL Failure: " + jobName,
"talend-etl-" + context.environment.toLowerCase(),
"critical",
details
);
}Pattern-uri de Recuperare din Erori
Logica de Retry
// tLoop cu logica de retry pentru esecuri tranziente
// tJava_Init - Initializeaza variabilele de retry
int maxRetries = 3;
int currentRetry = 0;
int retryDelayMs = 1000; // Incepe cu 1 secunda
boolean success = false;
globalMap.put("maxRetries", maxRetries);
globalMap.put("currentRetry", currentRetry);
globalMap.put("retryDelayMs", retryDelayMs);
globalMap.put("success", success);
// tLoop condition:
// !((Boolean) globalMap.get("success")) && ((Integer) globalMap.get("currentRetry")) < ((Integer) globalMap.get("maxRetries"))
// tJava - Retry wrapper (OnSubjobError din procesarea principala)
int currentRetry = (Integer) globalMap.get("currentRetry");
int maxRetries = (Integer) globalMap.get("maxRetries");
int retryDelayMs = (Integer) globalMap.get("retryDelayMs");
String errorMessage = (String) globalMap.get("error_message");
// Verifica daca eroarea este retryable
boolean isRetryable = isRetryableError(errorMessage);
if (isRetryable && currentRetry < maxRetries - 1) {
currentRetry++;
globalMap.put("currentRetry", currentRetry);
System.out.println("Retry " + currentRetry + "/" + maxRetries +
" after " + retryDelayMs + "ms - Error: " + errorMessage);
// Exponential backoff
Thread.sleep(retryDelayMs);
globalMap.put("retryDelayMs", retryDelayMs * 2);
} else {
// Numarul maxim de retry-uri depasit sau eroare non-retryable
System.err.println("Failed after " + (currentRetry + 1) + " attempts");
throw new RuntimeException("Job failed: " + errorMessage);
}
// Metoda helper
private static boolean isRetryableError(String errorMessage) {
if (errorMessage == null) return false;
String lower = errorMessage.toLowerCase();
return lower.contains("connection") ||
lower.contains("timeout") ||
lower.contains("deadlock") ||
lower.contains("lock wait") ||
lower.contains("temporarily unavailable");
}
// tJava - Marcheaza succes la finalizare reusita
globalMap.put("success", true);Checkpoint si Reluare
/*
* Pattern checkpoint pentru job-uri de lunga durata:
*
* 1. Salveaza progresul in tabela de checkpoint
* 2. La esec, citeste ultimul checkpoint
* 3. Reia de la checkpoint
*/
// Structura tabelei de checkpoint:
/*
* CREATE TABLE etl_checkpoints (
* job_name VARCHAR(100),
* run_id VARCHAR(50),
* checkpoint_key VARCHAR(200),
* checkpoint_value VARCHAR(500),
* created_at DATETIME DEFAULT GETDATE(),
* PRIMARY KEY (job_name, run_id, checkpoint_key)
* );
*/
// tJava - Salveaza checkpoint
public static void saveCheckpoint(String jobName, String runId,
String key, String value, java.sql.Connection conn) throws Exception {
String sql = "MERGE INTO etl_checkpoints AS target " +
"USING (SELECT ? as job_name, ? as run_id, ? as checkpoint_key, ? as checkpoint_value) AS source " +
"ON target.job_name = source.job_name AND target.run_id = source.run_id AND target.checkpoint_key = source.checkpoint_key " +
"WHEN MATCHED THEN UPDATE SET checkpoint_value = source.checkpoint_value, created_at = GETDATE() " +
"WHEN NOT MATCHED THEN INSERT (job_name, run_id, checkpoint_key, checkpoint_value) " +
"VALUES (source.job_name, source.run_id, source.checkpoint_key, source.checkpoint_value)";
try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, jobName);
ps.setString(2, runId);
ps.setString(3, key);
ps.setString(4, value);
ps.executeUpdate();
}
}
// tJava - Citeste checkpoint
public static String getCheckpoint(String jobName, String runId,
String key, java.sql.Connection conn) throws Exception {
String sql = "SELECT checkpoint_value FROM etl_checkpoints " +
"WHERE job_name = ? AND run_id = ? AND checkpoint_key = ?";
try (java.sql.PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, jobName);
ps.setString(2, runId);
ps.setString(3, key);
try (java.sql.ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
return rs.getString("checkpoint_value");
}
}
}
return null;
}
// Utilizare in procesarea pe batch-uri:
String lastProcessedId = getCheckpoint(jobName, runId, "last_processed_id", conn);
if (lastProcessedId != null) {
System.out.println("Resuming from checkpoint: " + lastProcessedId);
globalMap.put("resume_from_id", lastProcessedId);
} else {
System.out.println("Starting fresh run");
globalMap.put("resume_from_id", "0");
}
// Dupa fiecare batch:
saveCheckpoint(jobName, runId, "last_processed_id", currentBatchLastId, conn);Sumar Bune Practici
error_handling_best_practices:
design:
- "Gestioneaza intotdeauna OnSubjobError pentru fluxurile principale de procesare"
- "Foloseste fluxuri de reject la nivel de rand pentru probleme de calitate a datelor"
- "Implementeaza logica de retry pentru esecuri tranziente"
- "Seteaza Die on error corespunzator in functie de criticitatea componentei"
logging:
- "Foloseste logging structurat cu format consistent"
- "Include run_id pentru trasabilitate intre componente"
- "Logheaza la niveluri corespunzatoare (INFO, WARN, ERROR)"
- "Implementeaza rotatie log pentru logging in fisiere"
- "Stocheaza log-urile critice in baza de date pentru interogare"
alerting:
- "Alerteaza imediat la esecuri de job"
- "Foloseste canale diferite pentru severitati diferite"
- "Include informatii actionabile in alerte"
- "Nu supra-alerta - evita fatigarea alertelor"
recovery:
- "Implementeaza checkpoint/reluare pentru job-uri lungi"
- "Foloseste exponential backoff pentru retry-uri"
- "Diferentiaza erorile retryable de cele non-retryable"
- "Testeaza procedurile de recuperare regulat"
database:
- "Foloseste tranzactii pentru operatii cu pasi multipli"
- "Implementeaza rollback corect la esec"
- "Logheaza toate erorile de baza de date cu context"
- "Gestioneaza deadlock-urile cu logica de retry"Concluzie
Gestionarea robusta a erorilor transforma job-urile Talend din scripturi fragile in pipeline-uri de date de nivel productie. Implementeaza gestionarea la nivel de componenta, logging structurat si alerte pe canale multiple. Foloseste pattern-uri de retry pentru rezilienta si checkpoint-uri pentru capacitatea de recuperare. Aceste practici asigura procesarea fiabila a datelor chiar si in conditii dificile.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →