Integrarea cu Salesforce este unul dintre cele mai frecvente cazuri de utilizare MuleSoft. Acest ghid acopera totul, de la operatii CRUD de baza pana la pattern-uri avansate precum Change Data Capture si Platform Events.
Configurarea autentificarii
Configureaza autentificarea conectorului Salesforce:
<!-- Salesforce OAuth 2.0 JWT Bearer Configuration -->
<salesforce:sfdc-config name="Salesforce_Config" doc:name="Salesforce Config">
<salesforce:oauth-jwt-connection
consumerKey="${salesforce.consumerKey}"
keyStorePath="${salesforce.keystore.path}"
storePassword="${salesforce.keystore.password}"
principal="${salesforce.username}"
tokenEndpoint="https://login.salesforce.com/services/oauth2/token"
audienceUrl="https://login.salesforce.com">
<reconnection>
<reconnect frequency="3000" count="3"/>
</reconnection>
</salesforce:oauth-jwt-connection>
</salesforce:sfdc-config>
<!-- Alternative: Username-Password Configuration (Dev/Test) -->
<salesforce:sfdc-config name="Salesforce_Config_Basic" doc:name="Salesforce Config Basic">
<salesforce:basic-connection
username="${salesforce.username}"
password="${salesforce.password}"
securityToken="${salesforce.securityToken}"
url="https://login.salesforce.com/services/Soap/u/59.0"/>
</salesforce:sfdc-config>Configurarea securizata a proprietatilor
# src/main/resources/config-dev.yaml
salesforce:
consumerKey: "${secure::salesforce.consumerKey}"
username: "integration@company.com"
keystore:
path: "salesforce-keystore.jks"
password: "${secure::salesforce.keystore.password}"
# API configuration
api:
salesforce:
baseUrl: "https://yourinstance.salesforce.com"
apiVersion: "v59.0"Operatii CRUD
Crearea inregistrarilor
<flow name="create-salesforce-account">
<http:listener config-ref="HTTP_Listener_config" path="/accounts" method="POST"/>
<!-- Validate input -->
<ee:transform doc:name="Validate and Transform">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
[{
Name: payload.companyName,
Industry: payload.industry default "Technology",
Website: payload.website,
Phone: payload.phone,
BillingStreet: payload.address.street,
BillingCity: payload.address.city,
BillingState: payload.address.state,
BillingPostalCode: payload.address.postalCode,
BillingCountry: payload.address.country,
Description: payload.description,
// Custom fields
Integration_Source__c: "MuleSoft API",
External_ID__c: payload.externalId
}]]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Create in Salesforce -->
<salesforce:create config-ref="Salesforce_Config" type="Account" doc:name="Create Account"/>
<!-- Process response -->
<ee:transform doc:name="Build Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: payload[0].success,
salesforceId: payload[0].id,
message: if (payload[0].success)
"Account created successfully"
else
payload[0].errors[0].message
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Interogarea inregistrarilor cu SOQL
<flow name="query-salesforce-contacts">
<http:listener config-ref="HTTP_Listener_config" path="/contacts" method="GET"/>
<!-- Build dynamic SOQL query -->
<ee:transform doc:name="Build SOQL Query">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output text/plain
var accountId = attributes.queryParams.accountId
var searchTerm = attributes.queryParams.search default ""
var limit = attributes.queryParams.limit default 100
---
"SELECT Id, FirstName, LastName, Email, Phone, Title, Account.Name,
CreatedDate, LastModifiedDate
FROM Contact
WHERE AccountId = '" ++ accountId ++ "'" ++
(if (searchTerm != "")
" AND (FirstName LIKE '%" ++ searchTerm ++ "%' OR LastName LIKE '%" ++ searchTerm ++ "%')"
else "") ++
" ORDER BY LastModifiedDate DESC
LIMIT " ++ (limit as String)]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Execute query -->
<salesforce:query config-ref="Salesforce_Config" doc:name="Query Contacts">
<salesforce:salesforce-query>#[payload]</salesforce:salesforce-query>
</salesforce:query>
<!-- Transform results -->
<ee:transform doc:name="Transform Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
totalRecords: sizeOf(payload),
contacts: payload map {
id: $.Id,
firstName: $.FirstName,
lastName: $.LastName,
fullName: ($.FirstName default "") ++ " " ++ ($.LastName default ""),
email: $.Email,
phone: $.Phone,
title: $.Title,
accountName: $.Account.Name,
createdDate: $.CreatedDate,
lastModified: $.LastModifiedDate
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Upsert cu External ID
<flow name="upsert-salesforce-records">
<http:listener config-ref="HTTP_Listener_config" path="/accounts/upsert" method="PUT"/>
<ee:transform doc:name="Prepare Upsert Data">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
payload.accounts map {
External_ID__c: $.externalId,
Name: $.name,
Industry: $.industry,
Website: $.website,
Phone: $.phone,
BillingStreet: $.address.street,
BillingCity: $.address.city,
BillingState: $.address.state,
BillingCountry: $.address.country,
NumberOfEmployees: $.employeeCount,
AnnualRevenue: $.revenue
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Upsert using external ID field -->
<salesforce:upsert
config-ref="Salesforce_Config"
objectType="Account"
externalIdFieldName="External_ID__c"
doc:name="Upsert Accounts"/>
<ee:transform doc:name="Process Results">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
totalProcessed: sizeOf(payload),
created: sizeOf(payload filter $.created),
updated: sizeOf(payload filter !$.created),
failed: sizeOf(payload filter !$.success),
results: payload map {
externalId: $.id,
success: $.success,
created: $.created,
(errors: $.errors map $.message) if (!$.success)
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Operatii bulk de date
Gestioneaza eficient volume mari de date:
<flow name="bulk-sync-accounts">
<http:listener config-ref="HTTP_Listener_config" path="/bulk/accounts" method="POST"/>
<!-- Split into batches for bulk processing -->
<ee:transform doc:name="Prepare Bulk Records">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
payload.records map {
External_ID__c: $.externalId,
Name: $.name,
Industry: $.industry,
Website: $.website,
BillingStreet: $.address.street,
BillingCity: $.address.city,
BillingState: $.address.state,
BillingCountry: $.address.country
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Create Bulk Job -->
<salesforce:create-job
config-ref="Salesforce_Config"
operation="UPSERT"
type="Account"
externalIdFieldName="External_ID__c"
doc:name="Create Bulk Job">
<salesforce:content-type>CSV</salesforce:content-type>
</salesforce:create-job>
<set-variable variableName="jobId" value="#[payload.id]" doc:name="Store Job ID"/>
<!-- Split and create batches -->
<foreach collection="#[payload splitBy 10000]" doc:name="Process Batches">
<salesforce:create-batch
config-ref="Salesforce_Config"
jobInfoId="#[vars.jobId]"
doc:name="Create Batch">
<salesforce:objects>#[payload]</salesforce:objects>
</salesforce:create-batch>
<logger level="INFO" message="Batch created: #[payload.id]" doc:name="Log Batch"/>
</foreach>
<!-- Close job -->
<salesforce:close-job
config-ref="Salesforce_Config"
jobId="#[vars.jobId]"
doc:name="Close Job"/>
<!-- Poll for completion -->
<until-successful maxRetries="10" millisBetweenRetries="30000">
<salesforce:job-info
config-ref="Salesforce_Config"
jobId="#[vars.jobId]"
doc:name="Get Job Status"/>
<choice doc:name="Check Status">
<when expression="#[payload.state == 'JobComplete']">
<logger level="INFO" message="Bulk job completed" doc:name="Log Complete"/>
</when>
<when expression="#[payload.state == 'Failed' or payload.state == 'Aborted']">
<raise-error type="SALESFORCE:BULK_JOB_FAILED" description="Bulk job failed"/>
</when>
<otherwise>
<raise-error type="MULE:RETRY_EXHAUSTED" description="Job still processing"/>
</otherwise>
</choice>
</until-successful>
<!-- Get job results -->
<salesforce:batch-result
config-ref="Salesforce_Config"
jobId="#[vars.jobId]"
doc:name="Get Batch Results"/>
<ee:transform doc:name="Format Results">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
jobId: vars.jobId,
status: "completed",
recordsProcessed: payload.numberRecordsProcessed,
recordsFailed: payload.numberRecordsFailed,
batches: payload.batchInfo map {
batchId: $.id,
state: $.state,
recordsProcessed: $.numberRecordsProcessed,
recordsFailed: $.numberRecordsFailed
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Change Data Capture (CDC)
Aboneaza-te la modificarile Salesforce in timp real:
<flow name="salesforce-cdc-listener">
<!-- Subscribe to Account changes -->
<salesforce:subscribe-channel
config-ref="Salesforce_Config"
streamingChannel="/data/AccountChangeEvent"
replayOption="ONLY_NEW"
doc:name="Subscribe to Account CDC"/>
<!-- Process change events -->
<ee:transform doc:name="Parse Change Event">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
eventType: payload.ChangeEventHeader.changeType,
recordIds: payload.ChangeEventHeader.recordIds,
changeOrigin: payload.ChangeEventHeader.changeOrigin,
transactionKey: payload.ChangeEventHeader.transactionKey,
sequenceNumber: payload.ChangeEventHeader.sequenceNumber,
commitTimestamp: payload.ChangeEventHeader.commitTimestamp,
changedFields: payload.ChangeEventHeader.changedFields,
recordData: payload - "ChangeEventHeader"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" message="CDC Event: #[payload.eventType] for #[payload.recordIds]" doc:name="Log Event"/>
<!-- Route based on change type -->
<choice doc:name="Route by Change Type">
<when expression="#[payload.eventType == 'CREATE']">
<flow-ref name="handle-account-created" doc:name="Handle Create"/>
</when>
<when expression="#[payload.eventType == 'UPDATE']">
<flow-ref name="handle-account-updated" doc:name="Handle Update"/>
</when>
<when expression="#[payload.eventType == 'DELETE']">
<flow-ref name="handle-account-deleted" doc:name="Handle Delete"/>
</when>
<when expression="#[payload.eventType == 'UNDELETE']">
<flow-ref name="handle-account-undeleted" doc:name="Handle Undelete"/>
</when>
</choice>
</flow>
<sub-flow name="handle-account-updated">
<!-- Sync changes to external system -->
<ee:transform doc:name="Prepare External Update">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
source: "salesforce",
operation: "update",
timestamp: payload.commitTimestamp,
records: payload.recordIds map ((id) -> {
salesforceId: id,
changes: payload.recordData mapObject ((v, k) ->
{(k): v} if (payload.changedFields contains (k as String))
)
})
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Send to external system -->
<http:request
method="PUT"
config-ref="External_System_Config"
path="/api/accounts/sync"
doc:name="Sync to External System"/>
</sub-flow>Platform Events
Publica si aboneaza-te la Platform Events personalizate:
<!-- Publisher Flow -->
<flow name="publish-platform-event">
<http:listener config-ref="HTTP_Listener_config" path="/events/order" method="POST"/>
<ee:transform doc:name="Create Platform Event">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/java
---
[{
Order_Number__c: payload.orderNumber,
Customer_Id__c: payload.customerId,
Order_Total__c: payload.total,
Order_Status__c: payload.status,
Event_Type__c: "ORDER_CREATED",
Event_Timestamp__c: now() as String {format: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"}
}]]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Publish to Platform Event -->
<salesforce:create
config-ref="Salesforce_Config"
type="Order_Event__e"
doc:name="Publish Order Event"/>
<ee:transform doc:name="Build Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: payload[0].success,
eventId: payload[0].id,
message: "Platform event published successfully"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>
<!-- Subscriber Flow -->
<flow name="subscribe-platform-events">
<salesforce:subscribe-channel
config-ref="Salesforce_Config"
streamingChannel="/event/Order_Event__e"
replayOption="ONLY_NEW"
doc:name="Subscribe to Order Events"/>
<logger level="INFO" message="Received Platform Event: #[payload]" doc:name="Log Event"/>
<ee:transform doc:name="Process Event">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
orderNumber: payload.Order_Number__c,
customerId: payload.Customer_Id__c,
total: payload.Order_Total__c,
status: payload.Order_Status__c,
eventType: payload.Event_Type__c,
timestamp: payload.Event_Timestamp__c,
replayId: payload.ReplayId
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Process based on event type -->
<choice doc:name="Route Event">
<when expression="#[payload.eventType == 'ORDER_CREATED']">
<flow-ref name="process-new-order" doc:name="Process New Order"/>
</when>
<when expression="#[payload.eventType == 'ORDER_SHIPPED']">
<flow-ref name="process-shipment" doc:name="Process Shipment"/>
</when>
</choice>
</flow>Composite API pentru inregistrari relationate
Creaza inregistrari relationate intr-o singura tranzactie:
<flow name="create-account-with-contacts">
<http:listener config-ref="HTTP_Listener_config" path="/accounts/composite" method="POST"/>
<ee:transform doc:name="Build Composite Request">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
var accountRef = "refAccount"
---
{
allOrNone: true,
compositeRequest: [
// Create Account first
{
method: "POST",
url: "/services/data/v59.0/sobjects/Account",
referenceId: accountRef,
body: {
Name: payload.account.name,
Industry: payload.account.industry,
Website: payload.account.website,
External_ID__c: payload.account.externalId
}
}
] ++ (
// Create related Contacts
payload.contacts map ((contact, index) -> {
method: "POST",
url: "/services/data/v59.0/sobjects/Contact",
referenceId: "refContact" ++ (index as String),
body: {
FirstName: contact.firstName,
LastName: contact.lastName,
Email: contact.email,
Phone: contact.phone,
Title: contact.title,
AccountId: "@{" ++ accountRef ++ ".id}"
}
})
)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Execute Composite Request -->
<http:request
method="POST"
config-ref="Salesforce_REST_Config"
path="/services/data/v59.0/composite"
doc:name="Composite Request">
<http:headers><![CDATA[#[{
"Authorization": "Bearer " ++ vars.accessToken,
"Content-Type": "application/json"
}]]]></http:headers>
</http:request>
<ee:transform doc:name="Process Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: (payload.compositeResponse filter ($.httpStatusCode >= 400)) == [],
account: {
id: (payload.compositeResponse filter ($.referenceId == "refAccount"))[0].body.id,
success: (payload.compositeResponse filter ($.referenceId == "refAccount"))[0].httpStatusCode == 201
},
contacts: payload.compositeResponse
filter ($.referenceId startsWith "refContact")
map {
referenceId: $.referenceId,
id: $.body.id,
success: $.httpStatusCode == 201
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Gestionarea erorilor
Gestionare robusta a erorilor pentru operatiile Salesforce:
<flow name="salesforce-with-error-handling">
<http:listener config-ref="HTTP_Listener_config" path="/accounts/safe" method="POST"/>
<try doc:name="Try Salesforce Operation">
<salesforce:create config-ref="Salesforce_Config" type="Account" doc:name="Create Account">
<salesforce:records>#[payload]</salesforce:records>
</salesforce:create>
<ee:transform doc:name="Success Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: true,
salesforceId: payload[0].id
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<error-handler>
<on-error-propagate type="SALESFORCE:CONNECTIVITY">
<ee:transform doc:name="Connectivity Error">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: false,
errorType: "CONNECTIVITY",
message: "Unable to connect to Salesforce. Please try again later.",
retryable: true
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<set-variable variableName="httpStatus" value="503"/>
</on-error-propagate>
<on-error-propagate type="SALESFORCE:INVALID_SESSION">
<ee:transform doc:name="Session Error">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: false,
errorType: "AUTH_ERROR",
message: "Salesforce session expired. Re-authenticating...",
retryable: true
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</on-error-propagate>
<on-error-propagate type="SALESFORCE:INVALID_INPUT">
<ee:transform doc:name="Validation Error">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
success: false,
errorType: "VALIDATION_ERROR",
message: error.description,
fields: error.detailedMessage default []
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<set-variable variableName="httpStatus" value="400"/>
</on-error-propagate>
</error-handler>
</try>
</flow>Concluzie
Conectorul Salesforce din MuleSoft ofera capabilitati complete de integrare, de la CRUD de baza pana la streaming in timp real. Foloseste operatii bulk pentru seturi mari de date, CDC pentru sincronizare in timp real si Platform Events pentru arhitecturi event-driven. Gestionarea corecta a erorilor si configurarea autentificarii asigura integratii pregatite pentru productie.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →