Talend

Talend Real-Time Streaming with Kafka: Complete Integration Guide

DeviDevs Team
13 min read
#talend#kafka#streaming#real-time#event-driven

Real-time data streaming enables immediate data processing for modern applications. This guide covers Kafka integration with Talend for building event-driven data pipelines.

Streaming Architecture Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                 TALEND REAL-TIME STREAMING ARCHITECTURE                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   DATA SOURCES                   KAFKA CLUSTER             CONSUMERS        │
│  ┌─────────────┐              ┌─────────────────┐      ┌─────────────┐     │
│  │ Applications│──produce────►│                 │      │ Analytics   │     │
│  │ (Events)    │              │   ┌─────────┐   │      │ Database    │     │
│  └─────────────┘              │   │ Topic 1 │   │──────►│             │     │
│                               │   │ ─────── │   │      └─────────────┘     │
│  ┌─────────────┐              │   │partition│   │                          │
│  │ Databases   │──CDC────────►│   │partition│   │      ┌─────────────┐     │
│  │ (Changes)   │              │   │partition│   │      │ Real-Time   │     │
│  └─────────────┘              │   └─────────┘   │──────►│ Dashboard   │     │
│                               │                 │      └─────────────┘     │
│  ┌─────────────┐              │   ┌─────────┐   │                          │
│  │ IoT Sensors │──stream─────►│   │ Topic 2 │   │      ┌─────────────┐     │
│  │             │              │   └─────────┘   │──────►│ Data Lake   │     │
│  └─────────────┘              │                 │      └─────────────┘     │
│                               └─────────────────┘                          │
│                                        │                                    │
│                                        │                                    │
│   ┌────────────────────────────────────┼────────────────────────────────┐  │
│   │            TALEND STREAMING JOBS   │                                 │  │
│   │                                    │                                 │  │
│   │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │  │
│   │  │ tKafkaInput  │───►│ Transform    │───►│ tKafkaOutput │          │  │
│   │  │ (Consumer)   │    │ (tMap/Java)  │    │ (Producer)   │          │  │
│   │  └──────────────┘    └──────────────┘    └──────────────┘          │  │
│   │                                                                      │  │
│   │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │  │
│   │  │ tKafkaInput  │───►│ Aggregate/   │───►│ tDBOutput    │          │  │
│   │  │              │    │ Enrich       │    │ (Real-time)  │          │  │
│   │  └──────────────┘    └──────────────┘    └──────────────┘          │  │
│   │                                                                      │  │
│   └──────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

Kafka Connection Setup

Kafka Connection Configuration

/*
 * tKafkaConnection Component Settings:
 *
 * Kafka Version: 2.x / 3.x
 * Bootstrap Servers: kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092
 *
 * Security Protocol Options:
 * - PLAINTEXT (development)
 * - SSL (encrypted)
 * - SASL_PLAINTEXT (authentication)
 * - SASL_SSL (authentication + encryption)
 */
 
// Context variables for Kafka configuration
context.kafka_bootstrap_servers = "kafka-broker1:9092,kafka-broker2:9092"
context.kafka_security_protocol = "SASL_SSL"
context.kafka_sasl_mechanism = "PLAIN"
context.kafka_sasl_username = (String) // from secure storage
context.kafka_sasl_password = (String) // from secure storage
 
// Additional Kafka properties (Advanced Settings)
/*
 * ssl.truststore.location=/path/to/truststore.jks
 * ssl.truststore.password=${context.truststore_password}
 * ssl.keystore.location=/path/to/keystore.jks
 * ssl.keystore.password=${context.keystore_password}
 * ssl.key.password=${context.key_password}
 *
 * # SASL configuration
 * sasl.mechanism=PLAIN
 * sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
 *   username="${context.kafka_sasl_username}"
 *   password="${context.kafka_sasl_password}";
 */

Schema Registry Integration

/*
 * Confluent Schema Registry Configuration:
 *
 * tKafkaInput/tKafkaOutput with Avro:
 * - Value Deserializer/Serializer: Confluent Avro
 * - Schema Registry URL: http://schema-registry:8081
 * - Schema Registry Authentication (if needed)
 */
 
// Additional properties for Schema Registry
/*
 * schema.registry.url=http://schema-registry:8081
 * schema.registry.basic.auth.credentials.source=USER_INFO
 * schema.registry.basic.auth.user.info=${context.sr_user}:${context.sr_password}
 *
 * # Auto-register schemas
 * auto.register.schemas=true
 *
 * # Use specific schema version
 * use.latest.version=true
 */
 
// Avro schema example for events
/*
 * {
 *   "type": "record",
 *   "name": "OrderEvent",
 *   "namespace": "com.company.events",
 *   "fields": [
 *     {"name": "order_id", "type": "string"},
 *     {"name": "customer_id", "type": "string"},
 *     {"name": "product_id", "type": "string"},
 *     {"name": "quantity", "type": "int"},
 *     {"name": "price", "type": "double"},
 *     {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
 *     {"name": "status", "type": {"type": "enum", "name": "OrderStatus",
 *       "symbols": ["CREATED", "PROCESSING", "COMPLETED", "CANCELLED"]}}
 *   ]
 * }
 */

Kafka Consumer Patterns

Basic Consumer Job

/*
 * Job Structure:
 * tKafkaConnection → tKafkaInput → tMap → tLogRow/tDBOutput
 *
 * tKafkaInput Settings:
 * - Use existing connection: tKafkaConnection_1
 * - Topic: orders-events
 * - Consumer Group ID: talend-consumer-group
 * - Key Deserializer: String
 * - Value Deserializer: String (JSON)
 *
 * Consumer Properties:
 * - auto.offset.reset: earliest/latest
 * - enable.auto.commit: true/false
 * - max.poll.records: 500
 * - session.timeout.ms: 30000
 */
 
// tKafkaInput output schema:
/*
 * - key: String (message key)
 * - value: String (message payload)
 * - topic: String
 * - partition: Integer
 * - offset: Long
 * - timestamp: Long
 */
 
// tJavaRow - Parse JSON message
import org.json.JSONObject;
 
String jsonValue = input_row.value;
JSONObject event = new JSONObject(jsonValue);
 
output_row.order_id = event.getString("order_id");
output_row.customer_id = event.getString("customer_id");
output_row.product_id = event.getString("product_id");
output_row.quantity = event.getInt("quantity");
output_row.price = event.getDouble("price");
output_row.event_timestamp = event.getLong("timestamp");
output_row.status = event.getString("status");
 
// Keep Kafka metadata for tracking
output_row.kafka_partition = input_row.partition;
output_row.kafka_offset = input_row.offset;
output_row.kafka_timestamp = input_row.timestamp;

Batch Consumer with Commit Control

/*
 * Manual Offset Commit Pattern:
 *
 * tKafkaInput Settings:
 * - enable.auto.commit: false
 *
 * Job Flow:
 * tKafkaInput → Processing → tKafkaCommit (on success)
 *                         → Error handling (on failure, don't commit)
 */
 
// tJavaFlex - Batch processing with manual commit control
 
// Start Code
int batchSize = 1000;
int processedCount = 0;
java.util.List<String> failedMessages = new java.util.ArrayList<>();
boolean commitBatch = true;
 
// Main Code
try {
    // Process message
    processMessage(input_row);
    processedCount++;
 
} catch (Exception e) {
    failedMessages.add(input_row.key + ": " + e.getMessage());
    commitBatch = false; // Don't commit if any message fails
}
 
// Every batch, decide whether to commit
if (processedCount % batchSize == 0) {
    if (commitBatch) {
        globalMap.put("should_commit", true);
        System.out.println("Batch processed successfully, will commit");
    } else {
        globalMap.put("should_commit", false);
        System.out.println("Batch had failures, skipping commit");
        // Failed messages will be reprocessed
    }
    commitBatch = true; // Reset for next batch
}
 
// End Code
System.out.println("Total processed: " + processedCount);
System.out.println("Failed messages: " + failedMessages.size());
for (String fail : failedMessages) {
    System.err.println("  - " + fail);
}
 
// tKafkaCommit - Conditional commit
// Run only if: ((Boolean) globalMap.get("should_commit"))

Multi-Topic Consumer

/*
 * Subscribe to multiple topics with single consumer
 *
 * tKafkaInput Settings:
 * - Topic: orders-events,inventory-events,shipping-events
 *   (comma-separated list)
 *
 * OR use topic pattern:
 * - Topic Pattern: .*-events (regex)
 */
 
// tJavaRow - Route messages by topic
String topic = input_row.topic;
String value = input_row.value;
 
// Route to appropriate processor based on topic
switch (topic) {
    case "orders-events":
        processOrderEvent(value);
        output_row.event_type = "ORDER";
        break;
 
    case "inventory-events":
        processInventoryEvent(value);
        output_row.event_type = "INVENTORY";
        break;
 
    case "shipping-events":
        processShippingEvent(value);
        output_row.event_type = "SHIPPING";
        break;
 
    default:
        System.out.println("Unknown topic: " + topic);
        output_row.event_type = "UNKNOWN";
}
 
// Use tReplicate or tMap with multiple outputs to route
// to different downstream processors

Kafka Producer Patterns

Basic Producer Job

/*
 * Job Structure:
 * tDBInput → tMap → tKafkaOutput
 *
 * tKafkaOutput Settings:
 * - Use existing connection: tKafkaConnection_1
 * - Topic: processed-events
 * - Key: (expression for message key)
 * - Value: (expression for message payload)
 * - Compression: SNAPPY/LZ4/GZIP
 */
 
// tMap - Build Kafka message
/*
 * Input: row1 (from database)
 * Output: kafka_message
 *
 * Schema:
 * - key (String): row1.order_id
 * - value (String): Build JSON payload
 */
 
// Expression for value (JSON):
"{" +
    "\"order_id\": \"" + row1.order_id + "\"," +
    "\"customer_id\": \"" + row1.customer_id + "\"," +
    "\"total_amount\": " + row1.total_amount + "," +
    "\"status\": \"" + row1.status + "\"," +
    "\"processed_at\": " + System.currentTimeMillis() +
"}"
 
// Or use a JSON library:
// tJavaRow before tKafkaOutput
org.json.JSONObject message = new org.json.JSONObject();
message.put("order_id", input_row.order_id);
message.put("customer_id", input_row.customer_id);
message.put("total_amount", input_row.total_amount);
message.put("status", input_row.status);
message.put("processed_at", System.currentTimeMillis());
message.put("source", "talend-etl");
 
output_row.key = input_row.order_id;
output_row.value = message.toString();

Producer with Partitioning

/*
 * Kafka Partitioning Strategies:
 *
 * 1. Default (key-based): Messages with same key go to same partition
 * 2. Round-robin: null keys distributed evenly
 * 3. Custom partitioner: Implement custom logic
 */
 
// tJavaRow - Prepare message with partition key
// Use customer_id as key for customer-related ordering
output_row.key = input_row.customer_id;
 
// For custom partitioning, set partition directly:
// output_row.partition = calculatePartition(input_row.region);
 
// Custom partition calculation
private int calculatePartition(String region, int numPartitions) {
    // Route by region
    switch (region) {
        case "US": return 0;
        case "EU": return 1;
        case "APAC": return 2;
        default: return Math.abs(region.hashCode()) % numPartitions;
    }
}
 
/*
 * Producer Properties for guaranteed ordering:
 *
 * # Ensure messages are written in order
 * enable.idempotence=true
 * acks=all
 * max.in.flight.requests.per.connection=1
 *
 * # Retry configuration
 * retries=3
 * retry.backoff.ms=100
 */

Async Producer with Callbacks

// tJavaFlex - Async producer with delivery tracking
 
// Start Code
java.util.concurrent.atomic.AtomicInteger successCount =
    new java.util.concurrent.atomic.AtomicInteger(0);
java.util.concurrent.atomic.AtomicInteger failCount =
    new java.util.concurrent.atomic.AtomicInteger(0);
java.util.List<String> failedKeys = java.util.Collections.synchronizedList(
    new java.util.ArrayList<>());
 
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer =
    createProducer(context);
 
// Main Code
org.apache.kafka.clients.producer.ProducerRecord<String, String> record =
    new org.apache.kafka.clients.producer.ProducerRecord<>(
        context.kafka_topic,
        input_row.key,
        input_row.value
    );
 
// Add headers if needed
record.headers().add("source", "talend".getBytes());
record.headers().add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
 
producer.send(record, new org.apache.kafka.clients.producer.Callback() {
    @Override
    public void onCompletion(
            org.apache.kafka.clients.producer.RecordMetadata metadata,
            Exception exception) {
        if (exception != null) {
            failCount.incrementAndGet();
            failedKeys.add(input_row.key);
            System.err.println("Failed to send: " + input_row.key +
                " - " + exception.getMessage());
        } else {
            successCount.incrementAndGet();
            // Optional: Log successful delivery
            // System.out.println("Sent to partition " + metadata.partition() +
            //     " offset " + metadata.offset());
        }
    }
});
 
// End Code
producer.flush(); // Wait for all messages to be sent
producer.close();
 
System.out.println("Kafka producer summary:");
System.out.println("  Successful: " + successCount.get());
System.out.println("  Failed: " + failCount.get());
 
if (failCount.get() > 0) {
    System.err.println("Failed keys: " + failedKeys);
    // Optionally throw exception or handle failed messages
}
 
// Helper method
private org.apache.kafka.clients.producer.KafkaProducer<String, String> createProducer(
        TalendContext context) {
    java.util.Properties props = new java.util.Properties();
    props.put("bootstrap.servers", context.kafka_bootstrap_servers);
    props.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
    props.put("acks", "all");
    props.put("retries", 3);
    props.put("linger.ms", 5); // Small batching
    props.put("batch.size", 16384);
 
    return new org.apache.kafka.clients.producer.KafkaProducer<>(props);
}

Stream Processing Patterns

Event Enrichment Pipeline

/*
 * Job Structure:
 * tKafkaInput (raw events)
 *     │
 *     ▼
 * tMap (extract fields)
 *     │
 *     ├──► tHashInput (customer cache) ──► tMap (enrich)
 *     │                                         │
 *     │                                         ▼
 *     │                                   tKafkaOutput (enriched events)
 *     │
 *     └──► tDBInput (product lookup) ──► tMap (enrich)
 */
 
// Step 1: Cache lookup tables at job start
// tDBInput → tHashOutput (customer_cache)
 
// Step 2: Process stream and enrich
// tJavaRow - Enrich event with cached data
org.json.JSONObject event = new org.json.JSONObject(input_row.value);
String customerId = event.getString("customer_id");
String productId = event.getString("product_id");
 
// Lookup customer from cache
Object[] customerData = (Object[]) globalMap.get("customer_cache_" + customerId);
if (customerData != null) {
    event.put("customer_name", customerData[1]);
    event.put("customer_segment", customerData[2]);
    event.put("customer_region", customerData[3]);
} else {
    event.put("customer_name", "Unknown");
    event.put("customer_segment", "Unknown");
}
 
// Lookup product from cache
Object[] productData = (Object[]) globalMap.get("product_cache_" + productId);
if (productData != null) {
    event.put("product_name", productData[1]);
    event.put("product_category", productData[2]);
    event.put("product_price", productData[3]);
}
 
// Add enrichment metadata
event.put("enriched_at", System.currentTimeMillis());
event.put("enrichment_version", "1.0");
 
output_row.key = input_row.key;
output_row.value = event.toString();

Windowed Aggregation

/*
 * Time-windowed aggregation for streaming data
 * (Simple implementation - for production use Kafka Streams or Flink)
 */
 
// tJavaFlex - Time-window aggregation
 
// Start Code
// Window configuration
long windowSizeMs = 60000; // 1 minute windows
long currentWindowStart = (System.currentTimeMillis() / windowSizeMs) * windowSizeMs;
 
// Aggregation storage
java.util.Map<String, Double> windowAggregates = new java.util.HashMap<>();
java.util.Map<String, Integer> windowCounts = new java.util.HashMap<>();
 
// Main Code
org.json.JSONObject event = new org.json.JSONObject(input_row.value);
long eventTime = event.getLong("timestamp");
String groupKey = event.getString("product_category");
double amount = event.getDouble("amount");
 
// Check if event belongs to current window
long eventWindowStart = (eventTime / windowSizeMs) * windowSizeMs;
 
if (eventWindowStart != currentWindowStart) {
    // Window changed - emit current aggregates
    emitWindowResults(currentWindowStart, windowAggregates, windowCounts);
 
    // Reset for new window
    windowAggregates.clear();
    windowCounts.clear();
    currentWindowStart = eventWindowStart;
}
 
// Aggregate
windowAggregates.merge(groupKey, amount, Double::sum);
windowCounts.merge(groupKey, 1, Integer::sum);
 
// End Code
// Emit final window
emitWindowResults(currentWindowStart, windowAggregates, windowCounts);
 
// Helper method
private void emitWindowResults(long windowStart,
        java.util.Map<String, Double> sums,
        java.util.Map<String, Integer> counts) {
 
    for (String key : sums.keySet()) {
        org.json.JSONObject result = new org.json.JSONObject();
        result.put("window_start", windowStart);
        result.put("window_end", windowStart + windowSizeMs);
        result.put("category", key);
        result.put("total_amount", sums.get(key));
        result.put("transaction_count", counts.get(key));
        result.put("avg_amount", sums.get(key) / counts.get(key));
 
        // Send to output topic or database
        System.out.println("Window result: " + result.toString());
    }
}

Event Filtering and Routing

/*
 * Job Structure:
 * tKafkaInput
 *     │
 *     ▼
 * tJavaRow (parse & classify)
 *     │
 *     ▼
 * tReplicate ──┬──► Filter: priority == "HIGH" ──► tKafkaOutput (high-priority-topic)
 *              │
 *              ├──► Filter: type == "ORDER"    ──► tKafkaOutput (orders-topic)
 *              │
 *              └──► Default                     ──► tKafkaOutput (default-topic)
 */
 
// tJavaRow - Classify events
org.json.JSONObject event = new org.json.JSONObject(input_row.value);
 
// Extract classification fields
output_row.event_type = event.optString("type", "UNKNOWN");
output_row.priority = event.optString("priority", "NORMAL");
output_row.source = event.optString("source", "UNKNOWN");
 
// Add routing metadata
if (event.optDouble("amount", 0) > 10000) {
    output_row.priority = "HIGH"; // Override for high-value transactions
}
 
if (event.optString("customer_segment", "").equals("VIP")) {
    output_row.priority = "HIGH"; // VIP customers
}
 
// Pass through the original message
output_row.key = input_row.key;
output_row.value = input_row.value;
 
// Filter expressions in tFilterRow components:
// High priority: row.priority.equals("HIGH")
// Orders: row.event_type.equals("ORDER")
// Default: true (catch-all)

Error Handling for Streams

Dead Letter Queue Pattern

/*
 * Job Structure:
 * tKafkaInput
 *     │
 *     ▼
 * tJavaRow (process with try-catch)
 *     │
 *     ├──► Success ──► tKafkaOutput (processed-topic)
 *     │
 *     └──► Error ──► tKafkaOutput (dead-letter-topic)
 */
 
// tJavaFlex - Process with DLQ routing
 
// Start Code
int successCount = 0;
int dlqCount = 0;
 
// Main Code
try {
    // Parse and validate
    org.json.JSONObject event = new org.json.JSONObject(input_row.value);
 
    // Validate required fields
    if (!event.has("order_id") || !event.has("customer_id")) {
        throw new IllegalArgumentException("Missing required fields");
    }
 
    // Process event
    processEvent(event);
 
    // Route to success output
    output_row.destination = "SUCCESS";
    output_row.key = input_row.key;
    output_row.value = event.toString();
    successCount++;
 
} catch (Exception e) {
    // Route to DLQ
    org.json.JSONObject dlqMessage = new org.json.JSONObject();
    dlqMessage.put("original_key", input_row.key);
    dlqMessage.put("original_value", input_row.value);
    dlqMessage.put("original_topic", input_row.topic);
    dlqMessage.put("original_partition", input_row.partition);
    dlqMessage.put("original_offset", input_row.offset);
    dlqMessage.put("error_message", e.getMessage());
    dlqMessage.put("error_class", e.getClass().getName());
    dlqMessage.put("failed_at", System.currentTimeMillis());
 
    output_row.destination = "DLQ";
    output_row.key = input_row.key;
    output_row.value = dlqMessage.toString();
    dlqCount++;
 
    System.err.println("Sending to DLQ: " + input_row.key + " - " + e.getMessage());
}
 
// End Code
System.out.println("Processing complete: " + successCount + " success, " + dlqCount + " to DLQ");
 
// Use tFilterRow after to route:
// Success: row.destination.equals("SUCCESS") → tKafkaOutput_Success
// DLQ: row.destination.equals("DLQ") → tKafkaOutput_DLQ

Consumer Lag Monitoring

// tJava - Monitor consumer lag
 
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
 
java.util.Properties props = new java.util.Properties();
props.put("bootstrap.servers", context.kafka_bootstrap_servers);
// Add security properties if needed
 
AdminClient admin = AdminClient.create(props);
 
// Get consumer group offsets
String groupId = context.consumer_group_id;
ListConsumerGroupOffsetsResult offsetsResult =
    admin.listConsumerGroupOffsets(groupId);
 
java.util.Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    offsetsResult.partitionsToOffsetAndMetadata().get();
 
// Get end offsets (latest)
java.util.Set<TopicPartition> partitions = currentOffsets.keySet();
java.util.Map<TopicPartition, Long> endOffsets;
 
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    endOffsets = consumer.endOffsets(partitions);
}
 
// Calculate lag
long totalLag = 0;
System.out.println("Consumer lag for group: " + groupId);
 
for (TopicPartition tp : partitions) {
    long currentOffset = currentOffsets.get(tp).offset();
    long endOffset = endOffsets.get(tp);
    long lag = endOffset - currentOffset;
    totalLag += lag;
 
    System.out.println(String.format("  %s-%d: current=%d, end=%d, lag=%d",
        tp.topic(), tp.partition(), currentOffset, endOffset, lag));
}
 
System.out.println("Total lag: " + totalLag + " messages");
 
// Alert if lag exceeds threshold
if (totalLag > context.lag_alert_threshold) {
    sendAlert("Consumer lag alert: " + totalLag + " messages behind");
}
 
admin.close();

Best Practices

kafka_streaming_best_practices:
  consumer:
    - "Use consumer groups for scalability"
    - "Set appropriate auto.offset.reset policy"
    - "Implement idempotent processing"
    - "Monitor consumer lag continuously"
    - "Handle rebalances gracefully"
 
  producer:
    - "Use appropriate acks setting (all for critical data)"
    - "Enable idempotence for exactly-once semantics"
    - "Choose partition key wisely for ordering"
    - "Configure retries and backoff appropriately"
    - "Monitor producer metrics"
 
  error_handling:
    - "Implement dead letter queue for failed messages"
    - "Log errors with full context"
    - "Design for message replay capability"
    - "Set up alerting for processing failures"
 
  performance:
    - "Batch messages when possible"
    - "Use compression (LZ4 or SNAPPY)"
    - "Tune buffer sizes based on throughput"
    - "Monitor and alert on processing latency"
 
  operations:
    - "Use Schema Registry for schema management"
    - "Implement proper health checks"
    - "Plan for partition rebalancing"
    - "Document topic schemas and contracts"

Conclusion

Real-time streaming with Kafka enables event-driven data processing in Talend. Implement proper consumer group management, producer acknowledgments, and dead letter queues for production reliability. Monitor consumer lag and processing latency to ensure timely data delivery. These patterns form the foundation for modern event-driven architectures.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.