API integration is essential for modern data workflows. This guide covers comprehensive patterns for consuming and producing REST APIs in Talend with real-world examples.
API Integration Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ TALEND API INTEGRATION PATTERNS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ INBOUND (Consuming APIs) OUTBOUND (Producing APIs) │
│ ┌────────────────────────────┐ ┌────────────────────────────┐ │
│ │ │ │ │ │
│ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │
│ │ │ tRESTClient │ │ │ │ tRESTRequest │ │ │
│ │ │ • GET/POST/PUT/DEL │ │ │ │ (ESB/Route) │ │ │
│ │ │ • OAuth/JWT/API Key│ │ │ │ • Expose REST API │ │ │
│ │ └──────────┬──────────┘ │ │ └──────────┬──────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌──────────▼──────────┐ │ │ ┌──────────▼──────────┐ │ │
│ │ │ tExtractJSONFields │ │ │ │ Data Processing │ │ │
│ │ │ tXMLMap │ │ │ │ tMap, tJava │ │ │
│ │ └──────────┬──────────┘ │ │ └──────────┬──────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌──────────▼──────────┐ │ │ ┌──────────▼──────────┐ │ │
│ │ │ Data Processing │ │ │ │ tRESTResponse │ │ │
│ │ │ tMap, Database │ │ │ │ (JSON/XML output) │ │ │
│ │ └─────────────────────┘ │ │ └─────────────────────┘ │ │
│ │ │ │ │ │
│ └────────────────────────────┘ └────────────────────────────┘ │
│ │
│ INTEGRATION PATTERNS: │
│ • Request/Response (synchronous) │
│ • Polling (scheduled fetch) │
│ • Webhook (event-driven) │
│ • Batch API (bulk operations) │
│ • Pagination (large datasets) │
└─────────────────────────────────────────────────────────────────────────────┘
Basic REST Client Usage
Simple GET Request
/*
* Job Structure:
* tRESTClient_1 → tExtractJSONFields_1 → tLogRow_1
*
* tRESTClient_1 Configuration:
* - URL: "https://api.example.com/users"
* - HTTP Method: GET
* - Accept Type: APPLICATION/JSON
*/
// tRESTClient advanced settings
/*
* Component: tRESTClient
*
* Basic Settings:
* - URL: context.api_base_url + "/users"
* - HTTP Method: GET
*
* Headers:
* - Accept: application/json
* - X-API-Key: (String) context.api_key
*
* Response:
* - Response content type: JSON
*/
// tExtractJSONFields Configuration:
/*
* JSON Path Queries:
* - user_id: "$.data[*].id"
* - username: "$.data[*].username"
* - email: "$.data[*].email"
* - created_at: "$.data[*].created_at"
*
* Loop JSON Path: "$.data"
* (Extracts each item in the data array)
*/POST Request with JSON Body
/*
* Job Structure:
* tFileInputDelimited_1 → tMap_1 → tRESTClient_1 → tLogRow_1
*
* Sending data to API endpoint
*/
// tMap_1: Build JSON request body
/*
* Input: row1 (from file)
* Output: api_request
*
* Output Schema:
* - request_body (String): JSON body
*
* Expression for request_body:
*/
String requestBody = "{" +
"\"name\": \"" + row1.name + "\"," +
"\"email\": \"" + row1.email + "\"," +
"\"phone\": \"" + row1.phone + "\"," +
"\"company\": \"" + row1.company + "\"" +
"}";
// Or using routines for cleaner JSON:
// requestBody = routines.JSONBuilder.build()
// .add("name", row1.name)
// .add("email", row1.email)
// .add("phone", row1.phone)
// .add("company", row1.company)
// .toString();
/*
* tRESTClient Configuration:
* - URL: context.api_base_url + "/contacts"
* - HTTP Method: POST
* - Content Type: APPLICATION/JSON
* - HTTP Body: api_request.request_body
*
* Headers:
* - Authorization: "Bearer " + context.access_token
* - Content-Type: application/json
*/Authentication Patterns
OAuth 2.0 Client Credentials
// tJava - Get OAuth token before API calls
// Store token globally for reuse
String accessToken = (String) globalMap.get("oauth_access_token");
Long tokenExpiry = (Long) globalMap.get("oauth_token_expiry");
// Check if token needs refresh
if (accessToken == null || tokenExpiry == null ||
System.currentTimeMillis() > tokenExpiry) {
System.out.println("Obtaining new OAuth token...");
// Build token request
String tokenUrl = context.oauth_token_url;
String clientId = context.oauth_client_id;
String clientSecret = context.oauth_client_secret;
java.net.URL url = new java.net.URL(tokenUrl);
java.net.HttpURLConnection conn =
(java.net.HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
conn.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
// Build request body
String body = "grant_type=client_credentials" +
"&client_id=" + java.net.URLEncoder.encode(clientId, "UTF-8") +
"&client_secret=" + java.net.URLEncoder.encode(clientSecret, "UTF-8") +
"&scope=" + java.net.URLEncoder.encode(context.oauth_scope, "UTF-8");
// Send request
java.io.OutputStream os = conn.getOutputStream();
os.write(body.getBytes("UTF-8"));
os.flush();
os.close();
// Read response
int responseCode = conn.getResponseCode();
if (responseCode == 200) {
java.io.BufferedReader reader = new java.io.BufferedReader(
new java.io.InputStreamReader(conn.getInputStream()));
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
reader.close();
// Parse JSON response
org.json.JSONObject json = new org.json.JSONObject(response.toString());
accessToken = json.getString("access_token");
int expiresIn = json.getInt("expires_in");
// Store token with buffer (refresh 5 min before expiry)
globalMap.put("oauth_access_token", accessToken);
globalMap.put("oauth_token_expiry",
System.currentTimeMillis() + (expiresIn - 300) * 1000L);
System.out.println("OAuth token obtained, expires in " + expiresIn + "s");
} else {
throw new RuntimeException("Failed to get OAuth token: " + responseCode);
}
}
// Token is now available as: (String) globalMap.get("oauth_access_token")JWT Authentication
// tJavaRow - Generate JWT token for API authentication
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import java.util.Date;
// JWT configuration from context
String secretKey = context.jwt_secret_key;
String issuer = context.jwt_issuer;
String audience = context.jwt_audience;
int expirationMinutes = 60;
// Build JWT token
Date now = new Date();
Date expiration = new Date(now.getTime() + expirationMinutes * 60 * 1000);
String jwtToken = Jwts.builder()
.setIssuer(issuer)
.setSubject("api-client")
.setAudience(audience)
.setIssuedAt(now)
.setExpiration(expiration)
.claim("client_id", context.client_id)
.claim("permissions", "read,write")
.signWith(SignatureAlgorithm.HS256, secretKey.getBytes("UTF-8"))
.compact();
// Store for use in subsequent API calls
globalMap.put("jwt_token", jwtToken);
output_row.authorization_header = "Bearer " + jwtToken;API Key Authentication
/*
* tRESTClient with API Key Authentication
*
* Option 1: API Key in Header
* Headers:
* - X-API-Key: context.api_key
* - Authorization: ApiKey context.api_key
*
* Option 2: API Key in Query Parameter
* URL: context.api_base_url + "/endpoint?api_key=" + context.api_key
*
* Option 3: Basic Auth (API Key as password)
* - Use tRESTClient Basic Auth settings
* - Username: context.api_username
* - Password: context.api_key
*/
// Secure handling of API keys
// Always use context variables (never hardcode)
// Store in Talend Cloud TMC secrets
// Use environment-specific configurationsPagination Handling
Offset-Based Pagination
/*
* Job Structure for Paginated API:
* tJava_Init → tLoop → tRESTClient → tExtractJSONFields → tFlowToIterate → tJava_Process
*/
// tJava_Init - Initialize pagination
int pageSize = 100;
int currentPage = 0;
boolean hasMoreData = true;
globalMap.put("pageSize", pageSize);
globalMap.put("currentPage", currentPage);
globalMap.put("hasMoreData", hasMoreData);
globalMap.put("totalRecords", 0);
// tLoop condition: ((Boolean) globalMap.get("hasMoreData"))
// In tRESTClient, use dynamic URL:
// URL: context.api_base_url + "/records?offset=" +
// ((Integer) globalMap.get("currentPage") * (Integer) globalMap.get("pageSize")) +
// "&limit=" + globalMap.get("pageSize")
// tJava (after tExtractJSONFields) - Update pagination state
int currentPage = (Integer) globalMap.get("currentPage");
int pageSize = (Integer) globalMap.get("pageSize");
int totalRecords = (Integer) globalMap.get("totalRecords");
// Get record count from this page
int recordsThisPage = ((Integer) globalMap.get("tExtractJSONFields_1_NB_LINE"));
totalRecords += recordsThisPage;
System.out.println("Page " + currentPage + ": Retrieved " + recordsThisPage + " records");
// Check if more pages exist
if (recordsThisPage < pageSize) {
globalMap.put("hasMoreData", false);
System.out.println("Last page reached. Total records: " + totalRecords);
} else {
globalMap.put("currentPage", currentPage + 1);
}
globalMap.put("totalRecords", totalRecords);Cursor-Based Pagination
// tJava_Init - Initialize cursor pagination
globalMap.put("nextCursor", null); // null = first page
globalMap.put("hasMorePages", true);
// tLoop condition: ((Boolean) globalMap.get("hasMorePages"))
// Dynamic URL construction
// tJavaRow before tRESTClient:
String baseUrl = context.api_base_url + "/items";
String cursor = (String) globalMap.get("nextCursor");
if (cursor != null && !cursor.isEmpty()) {
output_row.api_url = baseUrl + "?cursor=" +
java.net.URLEncoder.encode(cursor, "UTF-8") +
"&limit=100";
} else {
output_row.api_url = baseUrl + "?limit=100";
}
// After tExtractJSONFields - Update cursor
/*
* Extract next_cursor from response:
* JSON Path: "$.pagination.next_cursor"
* Store as: next_cursor field
*/
// tJava - Update pagination state
String nextCursor = row.next_cursor;
if (nextCursor == null || nextCursor.isEmpty() || nextCursor.equals("null")) {
globalMap.put("hasMorePages", false);
System.out.println("No more pages");
} else {
globalMap.put("nextCursor", nextCursor);
System.out.println("Next cursor: " + nextCursor);
}Link Header Pagination
// tJava - Parse Link header for pagination (GitHub style)
// After tRESTClient, access response headers
String linkHeader = (String) globalMap.get("tRESTClient_1_HEADERS_Link");
if (linkHeader != null) {
// Parse Link header: <url>; rel="next", <url>; rel="last"
String nextUrl = null;
String[] links = linkHeader.split(",");
for (String link : links) {
String[] parts = link.trim().split(";");
if (parts.length == 2) {
String url = parts[0].trim();
String rel = parts[1].trim();
if (rel.contains("rel=\"next\"")) {
// Extract URL from < and >
nextUrl = url.substring(1, url.length() - 1);
break;
}
}
}
if (nextUrl != null) {
globalMap.put("nextPageUrl", nextUrl);
globalMap.put("hasMorePages", true);
System.out.println("Next page: " + nextUrl);
} else {
globalMap.put("hasMorePages", false);
System.out.println("Last page reached");
}
} else {
globalMap.put("hasMorePages", false);
}Error Handling
HTTP Error Handling
/*
* tRESTClient Error Handling Configuration:
*
* Advanced Settings:
* - Die on error: false (to handle errors gracefully)
*
* Use OnSubjobError/OnComponentError for error flow
*/
// tJava - Check HTTP response and handle errors
int httpStatus = (Integer) globalMap.get("tRESTClient_1_HTTP_STATUS");
String responseBody = (String) globalMap.get("tRESTClient_1_RESPONSE_BODY");
if (httpStatus >= 200 && httpStatus < 300) {
// Success
System.out.println("API call successful: " + httpStatus);
globalMap.put("api_success", true);
} else if (httpStatus == 401) {
// Authentication error - refresh token and retry
System.out.println("Authentication failed - refreshing token");
globalMap.put("needs_token_refresh", true);
globalMap.put("api_success", false);
} else if (httpStatus == 429) {
// Rate limited - wait and retry
System.out.println("Rate limited - waiting before retry");
// Check Retry-After header
String retryAfter = (String) globalMap.get("tRESTClient_1_HEADERS_Retry-After");
int waitSeconds = retryAfter != null ? Integer.parseInt(retryAfter) : 60;
Thread.sleep(waitSeconds * 1000);
globalMap.put("should_retry", true);
globalMap.put("api_success", false);
} else if (httpStatus >= 500) {
// Server error - log and potentially retry
System.err.println("Server error " + httpStatus + ": " + responseBody);
globalMap.put("api_success", false);
// Implement exponential backoff retry
int retryCount = (Integer) globalMap.getOrDefault("retry_count", 0);
if (retryCount < 3) {
int waitTime = (int) Math.pow(2, retryCount) * 1000;
Thread.sleep(waitTime);
globalMap.put("retry_count", retryCount + 1);
globalMap.put("should_retry", true);
} else {
throw new RuntimeException("Max retries exceeded for API call");
}
} else {
// Client error (4xx)
System.err.println("API error " + httpStatus + ": " + responseBody);
globalMap.put("api_success", false);
// Parse error response for details
try {
org.json.JSONObject errorJson = new org.json.JSONObject(responseBody);
String errorMessage = errorJson.optString("message", "Unknown error");
String errorCode = errorJson.optString("code", "UNKNOWN");
globalMap.put("api_error_message", errorMessage);
globalMap.put("api_error_code", errorCode);
} catch (Exception e) {
globalMap.put("api_error_message", responseBody);
}
}Retry with Exponential Backoff
// tJavaFlex - Retry logic wrapper
// Start Code
int maxRetries = 3;
int retryCount = 0;
boolean success = false;
Exception lastException = null;
while (!success && retryCount <= maxRetries) {
try {
// Main Code (the actual API call logic goes here)
// This represents the flow between Start and End
// End Code
success = true;
} catch (Exception e) {
lastException = e;
retryCount++;
if (retryCount <= maxRetries) {
// Exponential backoff: 1s, 2s, 4s
int waitTime = (int) Math.pow(2, retryCount - 1) * 1000;
System.out.println("Retry " + retryCount + "/" + maxRetries +
" after " + waitTime + "ms");
System.out.println("Error: " + e.getMessage());
try {
Thread.sleep(waitTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Retry interrupted", ie);
}
}
}
}
if (!success) {
throw new RuntimeException("API call failed after " + maxRetries +
" retries", lastException);
}Complex Transformations
Nested JSON Extraction
/*
* Sample API Response:
* {
* "data": {
* "users": [
* {
* "id": 1,
* "profile": {
* "name": "John",
* "contact": {
* "email": "john@example.com",
* "phones": [
* {"type": "mobile", "number": "555-1234"},
* {"type": "work", "number": "555-5678"}
* ]
* }
* },
* "orders": [
* {"id": "ORD-001", "total": 150.00},
* {"id": "ORD-002", "total": 275.50}
* ]
* }
* ]
* }
* }
*/
// tExtractJSONFields Configuration:
// For user data (one row per user):
/*
* Loop JSON Path: "$.data.users"
*
* Mappings:
* - user_id: "id" (Integer)
* - user_name: "profile.name" (String)
* - email: "profile.contact.email" (String)
* - phones_json: "profile.contact.phones" (String - keep as JSON array)
* - orders_json: "orders" (String - keep as JSON array)
*/
// For flattened phone numbers (one row per phone):
/*
* Loop JSON Path: "$.data.users[*].profile.contact.phones"
* (Use tExtractJSONFields in loop mode)
*
* Mappings:
* - phone_type: "type"
* - phone_number: "number"
*/
// For flattened orders:
/*
* Loop JSON Path: "$.data.users[*].orders"
*
* Mappings:
* - order_id: "id"
* - order_total: "total" (Double)
*/
// tJavaRow - Parse nested JSON manually for complex cases
import org.json.JSONArray;
import org.json.JSONObject;
String phonesJson = input_row.phones_json;
if (phonesJson != null && !phonesJson.isEmpty()) {
JSONArray phones = new JSONArray(phonesJson);
// Get first mobile phone
for (int i = 0; i < phones.length(); i++) {
JSONObject phone = phones.getJSONObject(i);
if ("mobile".equals(phone.getString("type"))) {
output_row.mobile_phone = phone.getString("number");
break;
}
}
// Concatenate all phone numbers
StringBuilder allPhones = new StringBuilder();
for (int i = 0; i < phones.length(); i++) {
if (i > 0) allPhones.append(", ");
JSONObject phone = phones.getJSONObject(i);
allPhones.append(phone.getString("type"))
.append(": ")
.append(phone.getString("number"));
}
output_row.all_phones = allPhones.toString();
}Building Complex JSON Requests
// tJavaRow - Build complex JSON for API request
import org.json.JSONObject;
import org.json.JSONArray;
// Build structured JSON request
JSONObject request = new JSONObject();
// Basic fields
request.put("transaction_id", input_row.trans_id);
request.put("timestamp", TalendDate.formatDate(
"yyyy-MM-dd'T'HH:mm:ss'Z'", new java.util.Date()));
// Nested object
JSONObject customer = new JSONObject();
customer.put("id", input_row.customer_id);
customer.put("name", input_row.customer_name);
customer.put("email", input_row.customer_email);
JSONObject address = new JSONObject();
address.put("street", input_row.street);
address.put("city", input_row.city);
address.put("state", input_row.state);
address.put("zip", input_row.zip);
address.put("country", input_row.country);
customer.put("address", address);
request.put("customer", customer);
// Array of items
JSONArray items = new JSONArray();
// Assuming items come from a delimited string or previous aggregation
String[] itemParts = input_row.items_csv.split("\\|");
for (String itemStr : itemParts) {
String[] fields = itemStr.split(",");
JSONObject item = new JSONObject();
item.put("sku", fields[0]);
item.put("quantity", Integer.parseInt(fields[1]));
item.put("unit_price", Double.parseDouble(fields[2]));
items.put(item);
}
request.put("items", items);
// Metadata
JSONObject metadata = new JSONObject();
metadata.put("source", "talend-etl");
metadata.put("version", "1.0");
metadata.put("job_name", jobName);
request.put("metadata", metadata);
output_row.request_body = request.toString(2); // Pretty print with indent
output_row.request_body_compact = request.toString(); // Compact for APIBatch API Operations
Bulk Data Upload
/*
* Job Structure for Batch API:
* tFileInputDelimited → tFlowToIterate (batch size) → tJava_BuildBatch →
* tRESTClient → tJava_ProcessResponse
*/
// tFlowToIterate Configuration:
// - Group by: batch_number (calculated field)
// - Or use "Row count" option with batch size
// tMap before tFlowToIterate - Add batch number
// Expression: (Numeric.sequence("batch", 1, 1) - 1) / context.batch_size
// tJavaFlex - Accumulate batch and send
// Start Code
java.util.List<org.json.JSONObject> batchItems = new java.util.ArrayList<>();
int batchSize = Integer.parseInt(context.batch_size);
int totalSent = 0;
// Main Code
org.json.JSONObject item = new org.json.JSONObject();
item.put("id", input_row.id);
item.put("name", input_row.name);
item.put("value", input_row.value);
batchItems.add(item);
// Send when batch is full
if (batchItems.size() >= batchSize) {
sendBatch(batchItems, context);
totalSent += batchItems.size();
batchItems.clear();
System.out.println("Sent batch, total: " + totalSent);
}
// End Code
// Send remaining items
if (!batchItems.isEmpty()) {
sendBatch(batchItems, context);
totalSent += batchItems.size();
System.out.println("Sent final batch, total: " + totalSent);
}
globalMap.put("total_records_sent", totalSent);
// Helper method (define in tJava before this component):
public static void sendBatch(java.util.List<org.json.JSONObject> items,
TalendContext context) throws Exception {
org.json.JSONObject request = new org.json.JSONObject();
request.put("records", new org.json.JSONArray(items));
java.net.URL url = new java.net.URL(context.api_base_url + "/batch");
java.net.HttpURLConnection conn =
(java.net.HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
conn.setRequestProperty("Content-Type", "application/json");
conn.setRequestProperty("Authorization", "Bearer " + context.access_token);
java.io.OutputStream os = conn.getOutputStream();
os.write(request.toString().getBytes("UTF-8"));
os.flush();
os.close();
int responseCode = conn.getResponseCode();
if (responseCode < 200 || responseCode >= 300) {
throw new RuntimeException("Batch API failed: " + responseCode);
}
}Best Practices
api_integration_best_practices:
authentication:
- "Use OAuth 2.0 for modern APIs"
- "Implement token refresh before expiration"
- "Store credentials in context/secrets"
- "Never hardcode API keys in jobs"
error_handling:
- "Implement retry with exponential backoff"
- "Handle rate limiting (429) gracefully"
- "Log all API errors with context"
- "Use circuit breaker for failing APIs"
performance:
- "Use batch endpoints when available"
- "Implement parallel API calls carefully"
- "Cache frequently accessed data"
- "Monitor API response times"
pagination:
- "Always handle pagination for list endpoints"
- "Use cursor-based when available (more reliable)"
- "Track total records for validation"
- "Implement timeout for runaway pagination"
data_handling:
- "Validate JSON structure before processing"
- "Handle null/missing fields gracefully"
- "Use appropriate data types"
- "Log sample records for debugging"
monitoring:
- "Track API call counts and latencies"
- "Alert on error rate thresholds"
- "Monitor rate limit usage"
- "Log request/response for debugging"Conclusion
Effective API integration in Talend requires proper authentication handling, robust error management, and efficient pagination. Use tRESTClient with tExtractJSONFields for consuming APIs, implement retry logic for resilience, and batch operations for performance. These patterns enable reliable data integration with modern REST APIs.