Effective job scheduling transforms individual ETL jobs into reliable automated pipelines. This guide covers Talend Administration Center (TAC) configuration, scheduling patterns, and enterprise orchestration strategies.
TAC Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ TALEND ADMINISTRATION CENTER (TAC) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ WEB CONSOLE │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Dashboard│ │ Jobs │ │Execution│ │ Logs │ │ Admin │ │ │
│ │ │ │ │ Manager │ │ Plans │ │ Viewer │ │ Settings│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ TAC DATABASE │ │
│ │ • Job metadata • Execution history • User permissions │ │
│ │ • Schedules/triggers • Audit logs • Server configs │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ EXECUTION SERVERS │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ JobServer 1 │ │ JobServer 2 │ │ JobServer 3 │ │ │
│ │ │ (Primary) │ │ (ETL) │ │ (Spark) │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ • Standard │ │ • Batch │ │ • Big Data │ │ │
│ │ │ Jobs │ │ Processing│ │ Jobs │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Job Deployment
Publishing Jobs to TAC
#!/bin/bash
# deploy_job.sh - Deploy Talend job to TAC
# Configuration
TAC_URL="http://tac-server:8080/org.talend.administrator"
TAC_USER="admin"
TAC_PASSWORD="password" # Use secure method in production
PROJECT_NAME="MY_PROJECT"
JOB_NAME="Daily_ETL_Job"
JOB_VERSION="1.0"
# Build job archive (from Talend Studio or CI/CD)
# This creates a .zip file with the compiled job
# Deploy using TAC REST API
curl -X POST "${TAC_URL}/rest/deployment/deployJob" \
-H "Content-Type: multipart/form-data" \
-u "${TAC_USER}:${TAC_PASSWORD}" \
-F "file=@${JOB_NAME}_${JOB_VERSION}.zip" \
-F "project=${PROJECT_NAME}" \
-F "jobName=${JOB_NAME}" \
-F "jobVersion=${JOB_VERSION}" \
-F "context=Production" \
-F "overwrite=true"
echo "Job deployed: ${JOB_NAME} v${JOB_VERSION}"Job Server Configuration
<!-- Talend JobServer configuration -->
<!-- Location: {TALEND_HOME}/conf/TalendJobServer.properties -->
# Server identification
talend.job.server.name=JobServer-ETL-01
talend.job.server.description=Primary ETL Job Server
# Connection settings
talend.job.server.port=8888
talend.job.server.host=0.0.0.0
# Execution settings
talend.job.server.concurrent.jobs=10
talend.job.server.timeout=3600000
# Memory settings
talend.job.server.java.options=-Xms1g -Xmx4g -XX:+UseG1GC
# Log settings
talend.job.server.log.level=INFO
talend.job.server.log.path=/var/log/talend/jobserver
# Security
talend.job.server.ssl.enabled=true
talend.job.server.ssl.keystore=/path/to/keystore.jks
talend.job.server.ssl.keystore.password=${KEYSTORE_PASSWORD}Trigger Configuration
Time-Based Triggers (Cron)
/*
* TAC Cron Expression Syntax:
* Seconds Minutes Hours DayOfMonth Month DayOfWeek Year
*
* Examples:
* 0 0 6 * * ? - Every day at 6:00 AM
* 0 0 */4 * * ? - Every 4 hours
* 0 30 8 ? * MON-FRI - Weekdays at 8:30 AM
* 0 0 0 1 * ? - First day of every month at midnight
* 0 0 22 ? * SUN - Every Sunday at 10:00 PM
*/
// Common scheduling patterns:
// Daily ETL at 2 AM (off-peak)
String dailyEtlCron = "0 0 2 * * ?";
// Hourly incremental loads (except midnight)
String hourlyCron = "0 0 1-23 * * ?";
// Every 15 minutes during business hours
String frequentCron = "0 0/15 8-18 ? * MON-FRI";
// End of day processing (weekdays at 11 PM)
String eodCron = "0 0 23 ? * MON-FRI";
// Weekly report on Monday morning
String weeklyCron = "0 0 7 ? * MON";
// Monthly close processing (last day of month at 11 PM)
String monthlyCron = "0 0 23 L * ?";
// Quarterly processing (first day of Jan, Apr, Jul, Oct)
String quarterlyCron = "0 0 1 1 1,4,7,10 ?";File-Based Triggers
/*
* TAC File Trigger Configuration:
*
* Trigger Type: File
* Directory: /data/incoming/sales
* File Pattern: sales_*.csv
* Polling Interval: 60 seconds
* File Age (seconds): 30 (ensure file is completely written)
* Move processed files: /data/archive/sales
*/
// File trigger parameters passed to job:
// - ${TRIGGER_FILE_PATH} : Full path of trigger file
// - ${TRIGGER_FILE_NAME} : Name of trigger file
// - ${TRIGGER_FILE_DIR} : Directory containing file
// - ${TRIGGER_TIMESTAMP} : Timestamp of trigger
// In job, access trigger parameters:
String inputFile = context.TRIGGER_FILE_PATH;
String fileName = context.TRIGGER_FILE_NAME;
System.out.println("Processing file: " + inputFile);
// After processing, file is automatically moved to archive
// based on trigger configurationEvent-Based Triggers
/*
* TAC Event Triggers:
*
* 1. Web Service Trigger
* - Expose REST endpoint
* - External systems call to trigger job
*
* 2. JMS/Kafka Trigger
* - Listen for messages
* - Trigger job on message receipt
*
* 3. Database Trigger
* - Poll table for new records
* - Trigger job when records found
*/
// Web Service Trigger endpoint:
// POST http://tac-server:8080/org.talend.administrator/rest/execution/trigger
// Body: {"jobName": "ProcessOrder", "context": "Production", "params": {...}}
// Database poll trigger query:
String pollQuery = "SELECT id, payload FROM trigger_queue " +
"WHERE status = 'PENDING' ORDER BY created_at LIMIT 10";
// After job execution, update trigger records:
String updateQuery = "UPDATE trigger_queue SET status = 'PROCESSED', " +
"processed_at = GETDATE() WHERE id IN (?)";Execution Plans
Creating Execution Plans
/*
* Execution Plan: Daily_Sales_Pipeline
*
* Purpose: End-to-end daily sales data processing
*
* Steps:
* 1. Extract_Sales_Data (parallel)
* 2. Extract_Customer_Data (parallel with step 1)
* 3. Transform_Sales (depends on 1 & 2)
* 4. Load_Data_Warehouse (depends on 3)
* 5. Generate_Reports (depends on 4)
* 6. Send_Notifications (depends on 5)
*/
// Execution Plan XML structure (for API/import)
/*
<?xml version="1.0" encoding="UTF-8"?>
<executionPlan name="Daily_Sales_Pipeline" description="Daily sales processing pipeline">
<step id="1" name="Extract_Sales">
<job name="Extract_Sales_Data" version="1.0" context="Production"/>
<parallelWith>2</parallelWith>
</step>
<step id="2" name="Extract_Customers">
<job name="Extract_Customer_Data" version="1.0" context="Production"/>
<parallelWith>1</parallelWith>
</step>
<step id="3" name="Transform">
<job name="Transform_Sales" version="1.0" context="Production"/>
<dependsOn>1,2</dependsOn>
<onError>STOP_PLAN</onError>
</step>
<step id="4" name="Load_DW">
<job name="Load_Data_Warehouse" version="1.0" context="Production"/>
<dependsOn>3</dependsOn>
<onError>STOP_PLAN</onError>
</step>
<step id="5" name="Reports">
<job name="Generate_Reports" version="1.0" context="Production"/>
<dependsOn>4</dependsOn>
<onError>CONTINUE</onError>
</step>
<step id="6" name="Notify">
<job name="Send_Notifications" version="1.0" context="Production"/>
<dependsOn>5</dependsOn>
<onError>CONTINUE</onError>
</step>
<trigger type="CRON" expression="0 0 2 * * ?"/>
</executionPlan>
*/Dependency Management
/*
* Dependency Types in TAC:
*
* 1. Sequential: Job B runs after Job A completes
* 2. Parallel: Jobs A and B run simultaneously
* 3. Conditional: Job B runs only if Job A succeeds/fails
* 4. Data-based: Job B runs when Job A produces specific data
*/
// Using tRunJob for embedded dependencies:
/*
* Master_Job:
* tRunJob_1 (Extract) → OnSubjobOk → tRunJob_2 (Transform) → OnSubjobOk → tRunJob_3 (Load)
* → OnSubjobError → tRunJob_Error (Alert)
*/
// tRunJob configuration:
/*
* Job: Extract_Sales_Data
* Context: Production
* Pass context to child: true
* Wait for job to complete: true
*
* Context parameters override:
* - process_date: context.process_date
* - batch_id: context.batch_id
*/
// Check child job status:
int childExitCode = (Integer) globalMap.get("tRunJob_1_CHILD_RETURN_CODE");
String childJobId = (String) globalMap.get("tRunJob_1_CHILD_JOB_PID");
if (childExitCode != 0) {
System.err.println("Child job failed with code: " + childExitCode);
// Handle failure
}Parallel Execution
/*
* Parallel execution patterns in TAC:
*
* 1. Plan-level parallelism: Multiple steps run concurrently
* 2. Job-level parallelism: Single job runs multiple instances
* 3. Data-level parallelism: Job processes data in parallel threads
*/
// Plan-level: Steps 1 & 2 marked as parallel
// TAC UI: Check "Run in parallel with" option
// Job-level: Multiple instances of same job
// Use different parameters for each instance:
/*
* Instance 1: region=US
* Instance 2: region=EU
* Instance 3: region=APAC
*/
// tParallelize in job for parallel data flows:
/*
* tParallelize
* ├── Branch 1: Process file type A
* ├── Branch 2: Process file type B
* └── Branch 3: Process file type C
*
* All branches execute concurrently
*/
// Synchronization point after parallel execution:
/*
* tParallelize branches ──► tSleep (sync point) ──► tRunJob (next step)
*
* tSleep waits for all parallel branches to complete
*/Monitoring and Alerting
Job Execution Monitoring
// TAC REST API - Get execution status
/*
* GET /rest/execution/status/{taskId}
*
* Response:
* {
* "taskId": "12345",
* "jobName": "Daily_ETL",
* "status": "RUNNING",
* "startTime": "2024-01-15T02:00:00Z",
* "currentStep": "Transform_Sales",
* "progress": 65,
* "rowsProcessed": 1500000
* }
*/
// Custom monitoring dashboard queries:
// Jobs currently running:
String runningJobsQuery = "SELECT j.name, e.start_time, e.status, s.name as server " +
"FROM executions e " +
"JOIN jobs j ON e.job_id = j.id " +
"JOIN servers s ON e.server_id = s.id " +
"WHERE e.status = 'RUNNING'";
// Job execution history:
String historyQuery = "SELECT j.name, e.start_time, e.end_time, e.status, " +
"DATEDIFF(SECOND, e.start_time, e.end_time) as duration_sec " +
"FROM executions e " +
"JOIN jobs j ON e.job_id = j.id " +
"WHERE e.start_time >= DATEADD(DAY, -7, GETDATE()) " +
"ORDER BY e.start_time DESC";
// Failed jobs in last 24 hours:
String failedJobsQuery = "SELECT j.name, e.start_time, e.error_message " +
"FROM executions e " +
"JOIN jobs j ON e.job_id = j.id " +
"WHERE e.status = 'ERROR' " +
"AND e.start_time >= DATEADD(DAY, -1, GETDATE())";Alert Configuration
// TAC Alert configuration via API
/*
* POST /rest/alerts/create
*
* {
* "name": "Critical Job Failure Alert",
* "type": "JOB_FAILURE",
* "jobs": ["Daily_ETL", "Monthly_Close"],
* "severity": "CRITICAL",
* "notification": {
* "email": ["data-team@company.com", "oncall@company.com"],
* "slack": "#data-alerts",
* "pagerduty": "P12345"
* },
* "conditions": {
* "consecutiveFailures": 1,
* "timeWindow": "1h"
* }
* }
*/
// Email alert template:
/*
* Subject: [ALERT] Talend Job Failed: ${jobName}
*
* Job Failure Alert
* =================
*
* Job Name: ${jobName}
* Execution ID: ${executionId}
* Start Time: ${startTime}
* End Time: ${endTime}
* Duration: ${duration}
* Status: FAILED
*
* Error Message:
* ${errorMessage}
*
* Server: ${serverName}
* Environment: ${environment}
*
* Link to logs: ${tacUrl}/execution/${executionId}/logs
*/
// Slack webhook alert:
public static void sendSlackAlert(String webhookUrl, String jobName,
String errorMessage, String executionId) throws Exception {
String payload = String.format(
"{\"attachments\": [{" +
"\"color\": \"danger\"," +
"\"title\": \"Talend Job Failed: %s\"," +
"\"text\": \"%s\"," +
"\"fields\": [" +
" {\"title\": \"Execution ID\", \"value\": \"%s\", \"short\": true}," +
" {\"title\": \"Time\", \"value\": \"%s\", \"short\": true}" +
"]" +
"}]}",
jobName, escapeJson(errorMessage), executionId,
new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date())
);
java.net.URL url = new java.net.URL(webhookUrl);
java.net.HttpURLConnection conn = (java.net.HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
conn.getOutputStream().write(payload.getBytes("UTF-8"));
conn.getResponseCode();
}SLA Monitoring
// Monitor job SLAs (Service Level Agreements)
/*
* SLA Configuration:
* - Job: Daily_ETL
* - Expected completion: By 6:00 AM
* - Max duration: 4 hours
* - Alert if: Not completed by 5:30 AM (30 min warning)
*/
// tJava - SLA check job
import java.time.*;
String jobName = "Daily_ETL";
LocalTime slaDeadline = LocalTime.of(6, 0); // 6:00 AM
LocalTime warningTime = LocalTime.of(5, 30); // 5:30 AM warning
Duration maxDuration = Duration.ofHours(4);
// Check if job completed today
boolean jobCompleted = checkJobCompletedToday(jobName);
LocalTime now = LocalTime.now();
if (!jobCompleted) {
if (now.isAfter(slaDeadline)) {
// SLA breached!
sendAlert("CRITICAL", "SLA BREACH: " + jobName + " not completed by deadline");
} else if (now.isAfter(warningTime)) {
// Warning
sendAlert("WARNING", "SLA at risk: " + jobName + " not yet completed");
}
}
// Check duration of running job
Duration currentDuration = getJobRunningDuration(jobName);
if (currentDuration != null && currentDuration.compareTo(maxDuration) > 0) {
sendAlert("WARNING", jobName + " running longer than expected: " +
currentDuration.toHours() + " hours");
}
// Helper methods
private boolean checkJobCompletedToday(String jobName) throws Exception {
// Query TAC database or API
return false; // Implementation
}
private Duration getJobRunningDuration(String jobName) throws Exception {
// Query TAC for running job start time
return null; // Implementation
}Advanced Patterns
Dynamic Job Generation
// Generate jobs dynamically based on data
// Master job reads configuration and spawns child jobs
/*
* tDBInput (job_config table)
* │
* ▼
* tFlowToIterate
* │
* ▼
* tRunJob (dynamic job name from config)
*/
// Job configuration table:
/*
* CREATE TABLE etl_job_config (
* config_id INT PRIMARY KEY,
* job_name VARCHAR(100),
* source_system VARCHAR(50),
* source_table VARCHAR(100),
* target_table VARCHAR(100),
* enabled BIT,
* priority INT,
* parameters TEXT
* );
*/
// tRunJob with dynamic job name:
/*
* Job: (String) globalMap.get("row1.job_name")
* Context: Production
*
* Context parameters:
* - source_system: (String) globalMap.get("row1.source_system")
* - source_table: (String) globalMap.get("row1.source_table")
* - target_table: (String) globalMap.get("row1.target_table")
*/Cross-Environment Promotion
#!/bin/bash
# promote_job.sh - Promote job from DEV to PROD
DEV_TAC_URL="http://tac-dev:8080"
PROD_TAC_URL="http://tac-prod:8080"
JOB_NAME="$1"
JOB_VERSION="$2"
# Export from DEV
echo "Exporting ${JOB_NAME} v${JOB_VERSION} from DEV..."
curl -X GET "${DEV_TAC_URL}/rest/deployment/exportJob" \
-u "admin:password" \
-d "jobName=${JOB_NAME}" \
-d "version=${JOB_VERSION}" \
-o "${JOB_NAME}_${JOB_VERSION}.zip"
# Import to PROD (with approval workflow in real scenario)
echo "Importing to PROD..."
curl -X POST "${PROD_TAC_URL}/rest/deployment/deployJob" \
-u "admin:password" \
-F "file=@${JOB_NAME}_${JOB_VERSION}.zip" \
-F "overwrite=false"
# Update context to Production
echo "Updating context..."
curl -X PUT "${PROD_TAC_URL}/rest/tasks/updateContext" \
-u "admin:password" \
-H "Content-Type: application/json" \
-d "{\"jobName\": \"${JOB_NAME}\", \"context\": \"Production\"}"
echo "Promotion complete"Recovery and Restart
// Implement checkpoint-based restart capability
// At start of job, check for incomplete previous run
String jobId = jobName + "_" + context.batch_date;
String checkpointKey = "checkpoint_" + jobId;
// Read last checkpoint
String lastCheckpoint = readCheckpoint(checkpointKey);
if (lastCheckpoint != null) {
System.out.println("Resuming from checkpoint: " + lastCheckpoint);
globalMap.put("resume_from", lastCheckpoint);
globalMap.put("is_restart", true);
} else {
System.out.println("Starting fresh run");
globalMap.put("is_restart", false);
}
// In main processing, save checkpoints periodically
// tJavaRow - Save checkpoint every N records
int checkpointInterval = 10000;
int currentRow = ((Integer) globalMap.get("row_counter", 0)) + 1;
globalMap.put("row_counter", currentRow);
if (currentRow % checkpointInterval == 0) {
saveCheckpoint(checkpointKey, String.valueOf(input_row.id));
System.out.println("Checkpoint saved at row " + currentRow);
}
// At end of job, clear checkpoint on success
// tJava (OnSubjobOk)
deleteCheckpoint(checkpointKey);
System.out.println("Job completed, checkpoint cleared");
// Helper methods
private void saveCheckpoint(String key, String value) throws Exception {
// Save to database or file
}
private String readCheckpoint(String key) throws Exception {
// Read from database or file
return null;
}
private void deleteCheckpoint(String key) throws Exception {
// Delete from database or file
}Best Practices
scheduling_best_practices:
design:
- "Use execution plans for multi-job pipelines"
- "Implement proper dependencies between jobs"
- "Design for restart/recovery capability"
- "Use meaningful job and plan names"
scheduling:
- "Schedule resource-intensive jobs during off-peak hours"
- "Stagger job start times to avoid contention"
- "Leave buffer time between dependent jobs"
- "Consider timezone differences for global operations"
monitoring:
- "Set up alerts for all critical jobs"
- "Monitor SLAs proactively"
- "Track job duration trends"
- "Review failed job logs promptly"
maintenance:
- "Archive old execution logs regularly"
- "Review and clean up unused jobs"
- "Test recovery procedures periodically"
- "Document scheduling decisions"
security:
- "Use role-based access control"
- "Encrypt sensitive context parameters"
- "Audit all job executions"
- "Separate environments properly"Conclusion
Effective job scheduling with TAC transforms individual jobs into reliable automated pipelines. Implement proper triggers, dependencies, and execution plans for complex workflows. Monitor SLAs, set up comprehensive alerting, and design for recovery. These patterns ensure consistent, reliable ETL operations at enterprise scale.