Streaming-ul de date in timp real permite procesarea imediata a datelor pentru aplicatii moderne. Acest ghid acopera integrarea Kafka cu Talend pentru construirea de pipeline-uri de date event-driven.
Prezentare Generala a Arhitecturii de Streaming
┌─────────────────────────────────────────────────────────────────────────────┐
│ TALEND REAL-TIME STREAMING ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SURSE DE DATE KAFKA CLUSTER CONSUMATORI │
│ ┌─────────────┐ ┌─────────────────┐ ┌─────────────┐ │
│ │ Aplicatii │──produce────►│ │ │ Analytics │ │
│ │ (Evenimente)│ │ ┌─────────┐ │ │ Database │ │
│ └─────────────┘ │ │ Topic 1 │ │──────►│ │ │
│ │ │ ─────── │ │ └─────────────┘ │
│ ┌─────────────┐ │ │partition│ │ │
│ │ Baze de │──CDC────────►│ │partition│ │ ┌─────────────┐ │
│ │ Date │ │ │partition│ │ │ Dashboard │ │
│ └─────────────┘ │ └─────────┘ │──────►│ Real-Time │ │
│ │ │ └─────────────┘ │
│ ┌─────────────┐ │ ┌─────────┐ │ │
│ │ Senzori IoT │──stream─────►│ │ Topic 2 │ │ ┌─────────────┐ │
│ │ │ │ └─────────┘ │──────►│ Data Lake │ │
│ └─────────────┘ │ │ └─────────────┘ │
│ └─────────────────┘ │
│ │ │
│ ┌────────────────────────────────────┼────────────────────────────────┐ │
│ │ TALEND STREAMING JOBS │ │ │
│ │ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ tKafkaInput │───►│ Transform │───►│ tKafkaOutput │ │ │
│ │ │ (Consumer) │ │ (tMap/Java) │ │ (Producer) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ tKafkaInput │───►│ Aggregate/ │───►│ tDBOutput │ │ │
│ │ │ │ │ Enrich │ │ (Real-time) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Configurare Conexiune Kafka
Configurare Conexiune Kafka
/*
* Setari Componenta tKafkaConnection:
*
* Kafka Version: 2.x / 3.x
* Bootstrap Servers: kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092
*
* Optiuni Security Protocol:
* - PLAINTEXT (development)
* - SSL (criptat)
* - SASL_PLAINTEXT (autentificare)
* - SASL_SSL (autentificare + criptare)
*/
// Variabile de context pentru configurare Kafka
context.kafka_bootstrap_servers = "kafka-broker1:9092,kafka-broker2:9092"
context.kafka_security_protocol = "SASL_SSL"
context.kafka_sasl_mechanism = "PLAIN"
context.kafka_sasl_username = (String) // din stocare securizata
context.kafka_sasl_password = (String) // din stocare securizata
// Proprietati suplimentare Kafka (Advanced Settings)
/*
* ssl.truststore.location=/path/to/truststore.jks
* ssl.truststore.password=${context.truststore_password}
* ssl.keystore.location=/path/to/keystore.jks
* ssl.keystore.password=${context.keystore_password}
* ssl.key.password=${context.key_password}
*
* # Configurare SASL
* sasl.mechanism=PLAIN
* sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
* username="${context.kafka_sasl_username}"
* password="${context.kafka_sasl_password}";
*/Integrare Schema Registry
/*
* Configurare Confluent Schema Registry:
*
* tKafkaInput/tKafkaOutput cu Avro:
* - Value Deserializer/Serializer: Confluent Avro
* - Schema Registry URL: http://schema-registry:8081
* - Schema Registry Authentication (daca este necesar)
*/
// Proprietati suplimentare pentru Schema Registry
/*
* schema.registry.url=http://schema-registry:8081
* schema.registry.basic.auth.credentials.source=USER_INFO
* schema.registry.basic.auth.user.info=${context.sr_user}:${context.sr_password}
*
* # Auto-inregistrare scheme
* auto.register.schemas=true
*
* # Foloseste versiunea specifica de schema
* use.latest.version=true
*/
// Exemplu schema Avro pentru evenimente
/*
* {
* "type": "record",
* "name": "OrderEvent",
* "namespace": "com.company.events",
* "fields": [
* {"name": "order_id", "type": "string"},
* {"name": "customer_id", "type": "string"},
* {"name": "product_id", "type": "string"},
* {"name": "quantity", "type": "int"},
* {"name": "price", "type": "double"},
* {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
* {"name": "status", "type": {"type": "enum", "name": "OrderStatus",
* "symbols": ["CREATED", "PROCESSING", "COMPLETED", "CANCELLED"]}}
* ]
* }
*/Pattern-uri Consumer Kafka
Job Consumer de Baza
/*
* Structura Job:
* tKafkaConnection → tKafkaInput → tMap → tLogRow/tDBOutput
*
* Setari tKafkaInput:
* - Use existing connection: tKafkaConnection_1
* - Topic: orders-events
* - Consumer Group ID: talend-consumer-group
* - Key Deserializer: String
* - Value Deserializer: String (JSON)
*
* Proprietati Consumer:
* - auto.offset.reset: earliest/latest
* - enable.auto.commit: true/false
* - max.poll.records: 500
* - session.timeout.ms: 30000
*/
// Schema output tKafkaInput:
/*
* - key: String (cheia mesajului)
* - value: String (payload-ul mesajului)
* - topic: String
* - partition: Integer
* - offset: Long
* - timestamp: Long
*/
// tJavaRow - Parseaza mesajul JSON
import org.json.JSONObject;
String jsonValue = input_row.value;
JSONObject event = new JSONObject(jsonValue);
output_row.order_id = event.getString("order_id");
output_row.customer_id = event.getString("customer_id");
output_row.product_id = event.getString("product_id");
output_row.quantity = event.getInt("quantity");
output_row.price = event.getDouble("price");
output_row.event_timestamp = event.getLong("timestamp");
output_row.status = event.getString("status");
// Pastreaza metadatele Kafka pentru tracking
output_row.kafka_partition = input_row.partition;
output_row.kafka_offset = input_row.offset;
output_row.kafka_timestamp = input_row.timestamp;Consumer Batch cu Control Commit
/*
* Pattern Manual Offset Commit:
*
* Setari tKafkaInput:
* - enable.auto.commit: false
*
* Flux Job:
* tKafkaInput → Procesare → tKafkaCommit (la succes)
* → Gestionare erori (la esec, fara commit)
*/
// tJavaFlex - Procesare batch cu control manual commit
// Start Code
int batchSize = 1000;
int processedCount = 0;
java.util.List<String> failedMessages = new java.util.ArrayList<>();
boolean commitBatch = true;
// Main Code
try {
// Proceseaza mesajul
processMessage(input_row);
processedCount++;
} catch (Exception e) {
failedMessages.add(input_row.key + ": " + e.getMessage());
commitBatch = false; // Nu face commit daca vreun mesaj esueaza
}
// La fiecare batch, decide daca face commit
if (processedCount % batchSize == 0) {
if (commitBatch) {
globalMap.put("should_commit", true);
System.out.println("Batch processed successfully, will commit");
} else {
globalMap.put("should_commit", false);
System.out.println("Batch had failures, skipping commit");
// Mesajele esuate vor fi reprocesate
}
commitBatch = true; // Reset pentru batch-ul urmator
}
// End Code
System.out.println("Total processed: " + processedCount);
System.out.println("Failed messages: " + failedMessages.size());
for (String fail : failedMessages) {
System.err.println(" - " + fail);
}Consumer Multi-Topic
/*
* Subscrie la topic-uri multiple cu un singur consumer
*
* Setari tKafkaInput:
* - Topic: orders-events,inventory-events,shipping-events
* (lista separata prin virgula)
*
* SAU foloseste pattern topic:
* - Topic Pattern: .*-events (regex)
*/
// tJavaRow - Ruteaza mesajele dupa topic
String topic = input_row.topic;
String value = input_row.value;
// Ruteaza catre procesorul potrivit pe baza topic-ului
switch (topic) {
case "orders-events":
processOrderEvent(value);
output_row.event_type = "ORDER";
break;
case "inventory-events":
processInventoryEvent(value);
output_row.event_type = "INVENTORY";
break;
case "shipping-events":
processShippingEvent(value);
output_row.event_type = "SHIPPING";
break;
default:
System.out.println("Unknown topic: " + topic);
output_row.event_type = "UNKNOWN";
}Pattern-uri Producer Kafka
Job Producer de Baza
/*
* Structura Job:
* tDBInput → tMap → tKafkaOutput
*
* Setari tKafkaOutput:
* - Use existing connection: tKafkaConnection_1
* - Topic: processed-events
* - Key: (expresie pentru cheia mesajului)
* - Value: (expresie pentru payload-ul mesajului)
* - Compression: SNAPPY/LZ4/GZIP
*/
// tMap - Construieste mesajul Kafka
/*
* Input: row1 (din baza de date)
* Output: kafka_message
*
* Schema:
* - key (String): row1.order_id
* - value (String): Construieste payload JSON
*/
// Expresie pentru value (JSON):
"{" +
"\"order_id\": \"" + row1.order_id + "\"," +
"\"customer_id\": \"" + row1.customer_id + "\"," +
"\"total_amount\": " + row1.total_amount + "," +
"\"status\": \"" + row1.status + "\"," +
"\"processed_at\": " + System.currentTimeMillis() +
"}"
// Sau foloseste o librarie JSON:
// tJavaRow inainte de tKafkaOutput
org.json.JSONObject message = new org.json.JSONObject();
message.put("order_id", input_row.order_id);
message.put("customer_id", input_row.customer_id);
message.put("total_amount", input_row.total_amount);
message.put("status", input_row.status);
message.put("processed_at", System.currentTimeMillis());
message.put("source", "talend-etl");
output_row.key = input_row.order_id;
output_row.value = message.toString();Producer cu Partitionare
/*
* Strategii de Partitionare Kafka:
*
* 1. Implicit (bazat pe cheie): Mesajele cu aceeasi cheie merg in aceeasi partitie
* 2. Round-robin: cheile null distribuite uniform
* 3. Partitioner personalizat: Implementeaza logica personalizata
*/
// tJavaRow - Pregateste mesajul cu cheie de partitie
// Foloseste customer_id ca cheie pentru ordonarea legata de clienti
output_row.key = input_row.customer_id;
// Pentru partitionare personalizata, seteaza partitia direct:
// output_row.partition = calculatePartition(input_row.region);
// Calcul partitie personalizat
private int calculatePartition(String region, int numPartitions) {
// Ruteaza dupa regiune
switch (region) {
case "US": return 0;
case "EU": return 1;
case "APAC": return 2;
default: return Math.abs(region.hashCode()) % numPartitions;
}
}
/*
* Proprietati Producer pentru ordonare garantata:
*
* # Asigura ca mesajele sunt scrise in ordine
* enable.idempotence=true
* acks=all
* max.in.flight.requests.per.connection=1
*
* # Configurare retry
* retries=3
* retry.backoff.ms=100
*/Producer Async cu Callback-uri
// tJavaFlex - Producer async cu tracking livrare
// Start Code
java.util.concurrent.atomic.AtomicInteger successCount =
new java.util.concurrent.atomic.AtomicInteger(0);
java.util.concurrent.atomic.AtomicInteger failCount =
new java.util.concurrent.atomic.AtomicInteger(0);
java.util.List<String> failedKeys = java.util.Collections.synchronizedList(
new java.util.ArrayList<>());
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer =
createProducer(context);
// Main Code
org.apache.kafka.clients.producer.ProducerRecord<String, String> record =
new org.apache.kafka.clients.producer.ProducerRecord<>(
context.kafka_topic,
input_row.key,
input_row.value
);
// Adauga headere daca este necesar
record.headers().add("source", "talend".getBytes());
record.headers().add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(record, new org.apache.kafka.clients.producer.Callback() {
@Override
public void onCompletion(
org.apache.kafka.clients.producer.RecordMetadata metadata,
Exception exception) {
if (exception != null) {
failCount.incrementAndGet();
failedKeys.add(input_row.key);
System.err.println("Failed to send: " + input_row.key +
" - " + exception.getMessage());
} else {
successCount.incrementAndGet();
}
}
});
// End Code
producer.flush(); // Asteapta toate mesajele sa fie trimise
producer.close();
System.out.println("Kafka producer summary:");
System.out.println(" Successful: " + successCount.get());
System.out.println(" Failed: " + failCount.get());
if (failCount.get() > 0) {
System.err.println("Failed keys: " + failedKeys);
}Pattern-uri de Procesare Stream
Pipeline Enrichment Evenimente
/*
* Structura Job:
* tKafkaInput (evenimente raw)
* │
* ▼
* tMap (extrage campuri)
* │
* ├──► tHashInput (cache clienti) ──► tMap (enrichment)
* │ │
* │ ▼
* │ tKafkaOutput (evenimente imbogatite)
* │
* └──► tDBInput (lookup produse) ──► tMap (enrichment)
*/
// Pasul 1: Cache tabele lookup la inceputul job-ului
// tDBInput → tHashOutput (customer_cache)
// Pasul 2: Proceseaza stream-ul si imbogateste
// tJavaRow - Imbogateste evenimentul cu date din cache
org.json.JSONObject event = new org.json.JSONObject(input_row.value);
String customerId = event.getString("customer_id");
String productId = event.getString("product_id");
// Lookup client din cache
Object[] customerData = (Object[]) globalMap.get("customer_cache_" + customerId);
if (customerData != null) {
event.put("customer_name", customerData[1]);
event.put("customer_segment", customerData[2]);
event.put("customer_region", customerData[3]);
} else {
event.put("customer_name", "Unknown");
event.put("customer_segment", "Unknown");
}
// Lookup produs din cache
Object[] productData = (Object[]) globalMap.get("product_cache_" + productId);
if (productData != null) {
event.put("product_name", productData[1]);
event.put("product_category", productData[2]);
event.put("product_price", productData[3]);
}
// Adauga metadate enrichment
event.put("enriched_at", System.currentTimeMillis());
event.put("enrichment_version", "1.0");
output_row.key = input_row.key;
output_row.value = event.toString();Agregare cu Ferestre de Timp
/*
* Agregare cu ferestre de timp pentru date de streaming
* (Implementare simpla - pentru productie foloseste Kafka Streams sau Flink)
*/
// tJavaFlex - Agregare cu ferestre de timp
// Start Code
// Configurare fereastra
long windowSizeMs = 60000; // ferestre de 1 minut
long currentWindowStart = (System.currentTimeMillis() / windowSizeMs) * windowSizeMs;
// Stocare agregare
java.util.Map<String, Double> windowAggregates = new java.util.HashMap<>();
java.util.Map<String, Integer> windowCounts = new java.util.HashMap<>();
// Main Code
org.json.JSONObject event = new org.json.JSONObject(input_row.value);
long eventTime = event.getLong("timestamp");
String groupKey = event.getString("product_category");
double amount = event.getDouble("amount");
// Verifica daca evenimentul apartine ferestrei curente
long eventWindowStart = (eventTime / windowSizeMs) * windowSizeMs;
if (eventWindowStart != currentWindowStart) {
// Fereastra s-a schimbat - emite agregatele curente
emitWindowResults(currentWindowStart, windowAggregates, windowCounts);
// Reset pentru fereastra noua
windowAggregates.clear();
windowCounts.clear();
currentWindowStart = eventWindowStart;
}
// Agregeaza
windowAggregates.merge(groupKey, amount, Double::sum);
windowCounts.merge(groupKey, 1, Integer::sum);
// End Code
// Emite fereastra finala
emitWindowResults(currentWindowStart, windowAggregates, windowCounts);Filtrare si Rutare Evenimente
/*
* Structura Job:
* tKafkaInput
* │
* ▼
* tJavaRow (parseaza si clasifica)
* │
* ▼
* tReplicate ──┬──► Filter: priority == "HIGH" ──► tKafkaOutput (high-priority-topic)
* │
* ├──► Filter: type == "ORDER" ──► tKafkaOutput (orders-topic)
* │
* └──► Default ──► tKafkaOutput (default-topic)
*/
// tJavaRow - Clasifica evenimentele
org.json.JSONObject event = new org.json.JSONObject(input_row.value);
// Extrage campuri de clasificare
output_row.event_type = event.optString("type", "UNKNOWN");
output_row.priority = event.optString("priority", "NORMAL");
output_row.source = event.optString("source", "UNKNOWN");
// Adauga metadate de rutare
if (event.optDouble("amount", 0) > 10000) {
output_row.priority = "HIGH"; // Override pentru tranzactii de valoare mare
}
if (event.optString("customer_segment", "").equals("VIP")) {
output_row.priority = "HIGH"; // Clienti VIP
}
// Trece prin mesajul original
output_row.key = input_row.key;
output_row.value = input_row.value;Gestionarea Erorilor pentru Stream-uri
Pattern Dead Letter Queue
/*
* Structura Job:
* tKafkaInput
* │
* ▼
* tJavaRow (proceseaza cu try-catch)
* │
* ├──► Succes ──► tKafkaOutput (processed-topic)
* │
* └──► Eroare ──► tKafkaOutput (dead-letter-topic)
*/
// tJavaFlex - Procesare cu rutare DLQ
// Start Code
int successCount = 0;
int dlqCount = 0;
// Main Code
try {
// Parseaza si valideaza
org.json.JSONObject event = new org.json.JSONObject(input_row.value);
// Valideaza campurile obligatorii
if (!event.has("order_id") || !event.has("customer_id")) {
throw new IllegalArgumentException("Missing required fields");
}
// Proceseaza evenimentul
processEvent(event);
// Ruteaza catre output succes
output_row.destination = "SUCCESS";
output_row.key = input_row.key;
output_row.value = event.toString();
successCount++;
} catch (Exception e) {
// Ruteaza catre DLQ
org.json.JSONObject dlqMessage = new org.json.JSONObject();
dlqMessage.put("original_key", input_row.key);
dlqMessage.put("original_value", input_row.value);
dlqMessage.put("original_topic", input_row.topic);
dlqMessage.put("original_partition", input_row.partition);
dlqMessage.put("original_offset", input_row.offset);
dlqMessage.put("error_message", e.getMessage());
dlqMessage.put("error_class", e.getClass().getName());
dlqMessage.put("failed_at", System.currentTimeMillis());
output_row.destination = "DLQ";
output_row.key = input_row.key;
output_row.value = dlqMessage.toString();
dlqCount++;
System.err.println("Sending to DLQ: " + input_row.key + " - " + e.getMessage());
}
// End Code
System.out.println("Processing complete: " + successCount + " success, " + dlqCount + " to DLQ");Monitorizare Consumer Lag
// tJava - Monitoreaza consumer lag
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
java.util.Properties props = new java.util.Properties();
props.put("bootstrap.servers", context.kafka_bootstrap_servers);
AdminClient admin = AdminClient.create(props);
// Obtine offset-urile grupului de consumatori
String groupId = context.consumer_group_id;
ListConsumerGroupOffsetsResult offsetsResult =
admin.listConsumerGroupOffsets(groupId);
java.util.Map<TopicPartition, OffsetAndMetadata> currentOffsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
// Obtine end offsets (cele mai recente)
java.util.Set<TopicPartition> partitions = currentOffsets.keySet();
java.util.Map<TopicPartition, Long> endOffsets;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
endOffsets = consumer.endOffsets(partitions);
}
// Calculeaza lag-ul
long totalLag = 0;
System.out.println("Consumer lag for group: " + groupId);
for (TopicPartition tp : partitions) {
long currentOffset = currentOffsets.get(tp).offset();
long endOffset = endOffsets.get(tp);
long lag = endOffset - currentOffset;
totalLag += lag;
System.out.println(String.format(" %s-%d: current=%d, end=%d, lag=%d",
tp.topic(), tp.partition(), currentOffset, endOffset, lag));
}
System.out.println("Total lag: " + totalLag + " messages");
// Alerteaza daca lag-ul depaseste pragul
if (totalLag > context.lag_alert_threshold) {
sendAlert("Consumer lag alert: " + totalLag + " messages behind");
}
admin.close();Bune Practici
kafka_streaming_best_practices:
consumer:
- "Foloseste grupuri de consumatori pentru scalabilitate"
- "Seteaza politica auto.offset.reset corespunzatoare"
- "Implementeaza procesare idempotenta"
- "Monitoreaza consumer lag-ul continuu"
- "Gestioneaza rebalance-urile elegant"
producer:
- "Foloseste setarea acks corespunzatoare (all pentru date critice)"
- "Activeaza idempotenta pentru semantica exactly-once"
- "Alege cheia de partitie cu intelepciune pentru ordonare"
- "Configureaza retry-urile si backoff-ul corespunzator"
- "Monitoreaza metricile producatorului"
gestionare_erori:
- "Implementeaza dead letter queue pentru mesaje esuate"
- "Logheaza erorile cu context complet"
- "Proiecteaza pentru capabilitate de replay mesaje"
- "Configureaza alerte pentru esecuri de procesare"
performanta:
- "Grupeaza mesajele in batch-uri cand este posibil"
- "Foloseste compresie (LZ4 sau SNAPPY)"
- "Ajusteaza dimensiunile buffer-elor pe baza throughput-ului"
- "Monitoreaza si alerteaza la latenta de procesare"
operatiuni:
- "Foloseste Schema Registry pentru managementul schemelor"
- "Implementeaza verificari de sanatate corespunzatoare"
- "Planifica pentru rebalansarea partitiilor"
- "Documenteaza schemele topic-urilor si contractele"Concluzie
Streaming-ul in timp real cu Kafka permite procesarea event-driven a datelor in Talend. Implementeaza managementul corect al grupurilor de consumatori, acknowledgement-urile producatorilor si dead letter queue-urile pentru fiabilitate in productie. Monitoreaza consumer lag-ul si latenta de procesare pentru a asigura livrarea la timp a datelor. Aceste pattern-uri formeaza fundamentul arhitecturilor moderne event-driven.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →