Salesforce integration is one of the most common MuleSoft use cases. This guide covers everything from basic CRUD operations to advanced patterns like Change Data Capture and Platform Events.
Authentication Configuration
Configure Salesforce connector authentication:
<!-- Salesforce OAuth 2.0 JWT Bearer Configuration -->
<salesforce:sfdc-config name="Salesforce_Config" doc:name="Salesforce Config">
<salesforce:oauth-jwt-connection
consumerKey="${salesforce.consumerKey}"
keyStorePath="${salesforce.keystore.path}"
storePassword="${salesforce.keystore.password}"
principal="${salesforce.username}"
tokenEndpoint="https://login.salesforce.com/services/oauth2/token"
audienceUrl="https://login.salesforce.com">
<reconnection>
<reconnect frequency="3000" count="3"/>
</reconnection>
</salesforce:oauth-jwt-connection>
</salesforce:sfdc-config>
<!-- Alternative: Username-Password Configuration (Dev/Test) -->
<salesforce:sfdc-config name="Salesforce_Config_Basic" doc:name="Salesforce Config Basic">
<salesforce:basic-connection
username="${salesforce.username}"
password="${salesforce.password}"
securityToken="${salesforce.securityToken}"
url="https://login.salesforce.com/services/Soap/u/59.0"/>
</salesforce:sfdc-config>Secure Property Configuration
# src/main/resources/config-dev.yaml
salesforce:
consumerKey: "${secure::salesforce.consumerKey}"
username: "integration@company.com"
keystore:
path: "salesforce-keystore.jks"
password: "${secure::salesforce.keystore.password}"
# API configuration
api:
salesforce:
baseUrl: "https://yourinstance.salesforce.com"
apiVersion: "v59.0"CRUD Operations
Create Records
<flow name="create-salesforce-account">
<http:listener config-ref="HTTP_Listener_config" path="/accounts" method="POST"/>
<!-- Validate input -->
<ee:transform doc:name="Validate and Transform">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
[{
Name: payload.companyName,
Industry: payload.industry default "Technology",
Website: payload.website,
Phone: payload.phone,
BillingStreet: payload.address.street,
BillingCity: payload.address.city,
BillingState: payload.address.state,
BillingPostalCode: payload.address.postalCode,
BillingCountry: payload.address.country,
Description: payload.description,
// Custom fields
Integration_Source__c: "MuleSoft API",
External_ID__c: payload.externalId
}]]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Create in Salesforce -->
<salesforce:create config-ref="Salesforce_Config" type="Account" doc:name="Create Account"/>
<!-- Process response -->
<ee:transform doc:name="Build Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: payload[0].success,
salesforceId: payload[0].id,
message: if (payload[0].success)
"Account created successfully"
else
payload[0].errors[0].message
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Query Records with SOQL
<flow name="query-salesforce-contacts">
<http:listener config-ref="HTTP_Listener_config" path="/contacts" method="GET"/>
<!-- Build dynamic SOQL query -->
<ee:transform doc:name="Build SOQL Query">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output text/plain
var accountId = attributes.queryParams.accountId
var searchTerm = attributes.queryParams.search default ""
var limit = attributes.queryParams.limit default 100
---
"SELECT Id, FirstName, LastName, Email, Phone, Title, Account.Name,
CreatedDate, LastModifiedDate
FROM Contact
WHERE AccountId = '" ++ accountId ++ "'" ++
(if (searchTerm != "")
" AND (FirstName LIKE '%" ++ searchTerm ++ "%' OR LastName LIKE '%" ++ searchTerm ++ "%')"
else "") ++
" ORDER BY LastModifiedDate DESC
LIMIT " ++ (limit as String)]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Execute query -->
<salesforce:query config-ref="Salesforce_Config" doc:name="Query Contacts">
<salesforce:salesforce-query>#[payload]</salesforce:salesforce-query>
</salesforce:query>
<!-- Transform results -->
<ee:transform doc:name="Transform Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
totalRecords: sizeOf(payload),
contacts: payload map {
id: $.Id,
firstName: $.FirstName,
lastName: $.LastName,
fullName: ($.FirstName default "") ++ " " ++ ($.LastName default ""),
email: $.Email,
phone: $.Phone,
title: $.Title,
accountName: $.Account.Name,
createdDate: $.CreatedDate,
lastModified: $.LastModifiedDate
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Upsert with External ID
<flow name="upsert-salesforce-records">
<http:listener config-ref="HTTP_Listener_config" path="/accounts/upsert" method="PUT"/>
<ee:transform doc:name="Prepare Upsert Data">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
payload.accounts map {
External_ID__c: $.externalId,
Name: $.name,
Industry: $.industry,
Website: $.website,
Phone: $.phone,
BillingStreet: $.address.street,
BillingCity: $.address.city,
BillingState: $.address.state,
BillingCountry: $.address.country,
NumberOfEmployees: $.employeeCount,
AnnualRevenue: $.revenue
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Upsert using external ID field -->
<salesforce:upsert
config-ref="Salesforce_Config"
objectType="Account"
externalIdFieldName="External_ID__c"
doc:name="Upsert Accounts"/>
<ee:transform doc:name="Process Results">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
totalProcessed: sizeOf(payload),
created: sizeOf(payload filter $.created),
updated: sizeOf(payload filter !$.created),
failed: sizeOf(payload filter !$.success),
results: payload map {
externalId: $.id,
success: $.success,
created: $.created,
(errors: $.errors map $.message) if (!$.success)
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Bulk Data Operations
Handle large data volumes efficiently:
<flow name="bulk-sync-accounts">
<http:listener config-ref="HTTP_Listener_config" path="/bulk/accounts" method="POST"/>
<!-- Split into batches for bulk processing -->
<ee:transform doc:name="Prepare Bulk Records">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
payload.records map {
External_ID__c: $.externalId,
Name: $.name,
Industry: $.industry,
Website: $.website,
BillingStreet: $.address.street,
BillingCity: $.address.city,
BillingState: $.address.state,
BillingCountry: $.address.country
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Create Bulk Job -->
<salesforce:create-job
config-ref="Salesforce_Config"
operation="UPSERT"
type="Account"
externalIdFieldName="External_ID__c"
doc:name="Create Bulk Job">
<salesforce:content-type>CSV</salesforce:content-type>
</salesforce:create-job>
<set-variable variableName="jobId" value="#[payload.id]" doc:name="Store Job ID"/>
<!-- Split and create batches -->
<foreach collection="#[payload splitBy 10000]" doc:name="Process Batches">
<salesforce:create-batch
config-ref="Salesforce_Config"
jobInfoId="#[vars.jobId]"
doc:name="Create Batch">
<salesforce:objects>#[payload]</salesforce:objects>
</salesforce:create-batch>
<logger level="INFO" message="Batch created: #[payload.id]" doc:name="Log Batch"/>
</foreach>
<!-- Close job -->
<salesforce:close-job
config-ref="Salesforce_Config"
jobId="#[vars.jobId]"
doc:name="Close Job"/>
<!-- Poll for completion -->
<until-successful maxRetries="10" millisBetweenRetries="30000">
<salesforce:job-info
config-ref="Salesforce_Config"
jobId="#[vars.jobId]"
doc:name="Get Job Status"/>
<choice doc:name="Check Status">
<when expression="#[payload.state == 'JobComplete']">
<logger level="INFO" message="Bulk job completed" doc:name="Log Complete"/>
</when>
<when expression="#[payload.state == 'Failed' or payload.state == 'Aborted']">
<raise-error type="SALESFORCE:BULK_JOB_FAILED" description="Bulk job failed"/>
</when>
<otherwise>
<raise-error type="MULE:RETRY_EXHAUSTED" description="Job still processing"/>
</otherwise>
</choice>
</until-successful>
<!-- Get job results -->
<salesforce:batch-result
config-ref="Salesforce_Config"
jobId="#[vars.jobId]"
doc:name="Get Batch Results"/>
<ee:transform doc:name="Format Results">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
jobId: vars.jobId,
status: "completed",
recordsProcessed: payload.numberRecordsProcessed,
recordsFailed: payload.numberRecordsFailed,
batches: payload.batchInfo map {
batchId: $.id,
state: $.state,
recordsProcessed: $.numberRecordsProcessed,
recordsFailed: $.numberRecordsFailed
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Change Data Capture (CDC)
Subscribe to real-time Salesforce changes:
<flow name="salesforce-cdc-listener">
<!-- Subscribe to Account changes -->
<salesforce:subscribe-channel
config-ref="Salesforce_Config"
streamingChannel="/data/AccountChangeEvent"
replayOption="ONLY_NEW"
doc:name="Subscribe to Account CDC"/>
<!-- Process change events -->
<ee:transform doc:name="Parse Change Event">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
eventType: payload.ChangeEventHeader.changeType,
recordIds: payload.ChangeEventHeader.recordIds,
changeOrigin: payload.ChangeEventHeader.changeOrigin,
transactionKey: payload.ChangeEventHeader.transactionKey,
sequenceNumber: payload.ChangeEventHeader.sequenceNumber,
commitTimestamp: payload.ChangeEventHeader.commitTimestamp,
changedFields: payload.ChangeEventHeader.changedFields,
recordData: payload - "ChangeEventHeader"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" message="CDC Event: #[payload.eventType] for #[payload.recordIds]" doc:name="Log Event"/>
<!-- Route based on change type -->
<choice doc:name="Route by Change Type">
<when expression="#[payload.eventType == 'CREATE']">
<flow-ref name="handle-account-created" doc:name="Handle Create"/>
</when>
<when expression="#[payload.eventType == 'UPDATE']">
<flow-ref name="handle-account-updated" doc:name="Handle Update"/>
</when>
<when expression="#[payload.eventType == 'DELETE']">
<flow-ref name="handle-account-deleted" doc:name="Handle Delete"/>
</when>
<when expression="#[payload.eventType == 'UNDELETE']">
<flow-ref name="handle-account-undeleted" doc:name="Handle Undelete"/>
</when>
</choice>
</flow>
<sub-flow name="handle-account-updated">
<!-- Sync changes to external system -->
<ee:transform doc:name="Prepare External Update">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
source: "salesforce",
operation: "update",
timestamp: payload.commitTimestamp,
records: payload.recordIds map ((id) -> {
salesforceId: id,
changes: payload.recordData mapObject ((v, k) ->
{(k): v} if (payload.changedFields contains (k as String))
)
})
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Send to external system -->
<http:request
method="PUT"
config-ref="External_System_Config"
path="/api/accounts/sync"
doc:name="Sync to External System"/>
</sub-flow>Platform Events
Publish and subscribe to custom Platform Events:
<!-- Publisher Flow -->
<flow name="publish-platform-event">
<http:listener config-ref="HTTP_Listener_config" path="/events/order" method="POST"/>
<ee:transform doc:name="Create Platform Event">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
[{
Order_Number__c: payload.orderNumber,
Customer_Id__c: payload.customerId,
Order_Total__c: payload.total,
Order_Status__c: payload.status,
Event_Type__c: "ORDER_CREATED",
Event_Timestamp__c: now() as String {format: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"}
}]]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Publish to Platform Event -->
<salesforce:create
config-ref="Salesforce_Config"
type="Order_Event__e"
doc:name="Publish Order Event"/>
<ee:transform doc:name="Build Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: payload[0].success,
eventId: payload[0].id,
message: "Platform event published successfully"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>
<!-- Subscriber Flow -->
<flow name="subscribe-platform-events">
<salesforce:subscribe-channel
config-ref="Salesforce_Config"
streamingChannel="/event/Order_Event__e"
replayOption="ONLY_NEW"
doc:name="Subscribe to Order Events"/>
<logger level="INFO" message="Received Platform Event: #[payload]" doc:name="Log Event"/>
<ee:transform doc:name="Process Event">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
orderNumber: payload.Order_Number__c,
customerId: payload.Customer_Id__c,
total: payload.Order_Total__c,
status: payload.Order_Status__c,
eventType: payload.Event_Type__c,
timestamp: payload.Event_Timestamp__c,
replayId: payload.ReplayId
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Process based on event type -->
<choice doc:name="Route Event">
<when expression="#[payload.eventType == 'ORDER_CREATED']">
<flow-ref name="process-new-order" doc:name="Process New Order"/>
</when>
<when expression="#[payload.eventType == 'ORDER_SHIPPED']">
<flow-ref name="process-shipment" doc:name="Process Shipment"/>
</when>
</choice>
</flow>Composite API for Related Records
Create related records in a single transaction:
<flow name="create-account-with-contacts">
<http:listener config-ref="HTTP_Listener_config" path="/accounts/composite" method="POST"/>
<ee:transform doc:name="Build Composite Request">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
var accountRef = "refAccount"
---
{
allOrNone: true,
compositeRequest: [
// Create Account first
{
method: "POST",
url: "/services/data/v59.0/sobjects/Account",
referenceId: accountRef,
body: {
Name: payload.account.name,
Industry: payload.account.industry,
Website: payload.account.website,
External_ID__c: payload.account.externalId
}
}
] ++ (
// Create related Contacts
payload.contacts map ((contact, index) -> {
method: "POST",
url: "/services/data/v59.0/sobjects/Contact",
referenceId: "refContact" ++ (index as String),
body: {
FirstName: contact.firstName,
LastName: contact.lastName,
Email: contact.email,
Phone: contact.phone,
Title: contact.title,
AccountId: "@{" ++ accountRef ++ ".id}"
}
})
)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Execute Composite Request -->
<http:request
method="POST"
config-ref="Salesforce_REST_Config"
path="/services/data/v59.0/composite"
doc:name="Composite Request">
<http:headers><![CDATA[#[{
"Authorization": "Bearer " ++ vars.accessToken,
"Content-Type": "application/json"
}]]]></http:headers>
</http:request>
<ee:transform doc:name="Process Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: (payload.compositeResponse filter ($.httpStatusCode >= 400)) == [],
account: {
id: (payload.compositeResponse filter ($.referenceId == "refAccount"))[0].body.id,
success: (payload.compositeResponse filter ($.referenceId == "refAccount"))[0].httpStatusCode == 201
},
contacts: payload.compositeResponse
filter ($.referenceId startsWith "refContact")
map {
referenceId: $.referenceId,
id: $.body.id,
success: $.httpStatusCode == 201
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Error Handling
Robust error handling for Salesforce operations:
<flow name="salesforce-with-error-handling">
<http:listener config-ref="HTTP_Listener_config" path="/accounts/safe" method="POST"/>
<try doc:name="Try Salesforce Operation">
<salesforce:create config-ref="Salesforce_Config" type="Account" doc:name="Create Account">
<salesforce:records>#[payload]</salesforce:records>
</salesforce:create>
<ee:transform doc:name="Success Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: true,
salesforceId: payload[0].id
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<error-handler>
<on-error-propagate type="SALESFORCE:CONNECTIVITY">
<ee:transform doc:name="Connectivity Error">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: false,
errorType: "CONNECTIVITY",
message: "Unable to connect to Salesforce. Please try again later.",
retryable: true
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<set-variable variableName="httpStatus" value="503"/>
</on-error-propagate>
<on-error-propagate type="SALESFORCE:INVALID_SESSION">
<ee:transform doc:name="Session Error">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: false,
errorType: "AUTH_ERROR",
message: "Salesforce session expired. Re-authenticating...",
retryable: true
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</on-error-propagate>
<on-error-propagate type="SALESFORCE:INVALID_INPUT">
<ee:transform doc:name="Validation Error">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: false,
errorType: "VALIDATION_ERROR",
message: error.description,
fields: error.detailedMessage default []
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<set-variable variableName="httpStatus" value="400"/>
</on-error-propagate>
</error-handler>
</try>
</flow>Conclusion
MuleSoft's Salesforce connector provides comprehensive integration capabilities from basic CRUD to real-time streaming. Use bulk operations for large datasets, CDC for real-time sync, and Platform Events for event-driven architectures. Proper error handling and authentication configuration ensure production-ready integrations.