Anypoint MQ is MuleSoft's cloud messaging service for reliable asynchronous communication. This guide covers queue configuration, messaging patterns, and production best practices.
Anypoint MQ Architecture
Core Components Overview
┌─────────────────────────────────────────────────────────────────────┐
│ ANYPOINT MQ ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ PUBLISHERS ANYPOINT MQ │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Mule App │───publish───────►│ │ │
│ └─────────────┘ │ ┌─────────────────────┐ │ │
│ ┌─────────────┐ │ │ Standard Queue │ │ │
│ │ REST API │───publish───────►│ │ (at-least-once) │ │ │
│ └─────────────┘ │ └─────────────────────┘ │ │
│ ┌─────────────┐ │ │ │
│ │ External │───publish───────►│ ┌─────────────────────┐ │ │
│ │ System │ │ │ FIFO Queue │ │ │
│ └─────────────┘ │ │ (exactly-once) │ │ │
│ │ └─────────────────────┘ │ │
│ SUBSCRIBERS │ │ │
│ ┌─────────────┐ │ ┌─────────────────────┐ │ │
│ │ Mule App │◄──subscribe──────│ │ Message Exchange │ │ │
│ └─────────────┘ │ │ (pub/sub topics) │ │ │
│ ┌─────────────┐ │ └─────────────────────┘ │ │
│ │ Mule App │◄──subscribe──────│ │ │
│ └─────────────┘ │ ┌─────────────────────┐ │ │
│ │ │ Dead Letter Queue │ │ │
│ │ │ (failed messages) │ │ │
│ │ └─────────────────────┘ │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Queue Configuration
Standard Queue Setup
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/anypoint-mq
http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd">
<!-- Anypoint MQ Configuration -->
<anypoint-mq:config name="Anypoint_MQ_Config" doc:name="Anypoint MQ Config">
<anypoint-mq:connection
url="${anypoint.mq.url}"
clientId="${anypoint.mq.clientId}"
clientSecret="${anypoint.mq.clientSecret}"/>
</anypoint-mq:config>
<!-- Alternative: Default Subscriber Configuration -->
<anypoint-mq:default-subscriber-config name="MQ_Subscriber_Config">
<anypoint-mq:connection
url="${anypoint.mq.url}"
clientId="${anypoint.mq.clientId}"
clientSecret="${anypoint.mq.clientSecret}"/>
</anypoint-mq:default-subscriber-config>
</mule>Queue Properties Configuration
# mule-artifact.json - Queue properties
anypoint:
mq:
# Region-specific URLs
url: "https://mq-us-east-1.anypoint.mulesoft.com/api/v1"
# url: "https://mq-eu-west-1.anypoint.mulesoft.com/api/v1"
# url: "https://mq-ap-southeast-1.anypoint.mulesoft.com/api/v1"
clientId: "${secure::anypoint.mq.clientId}"
clientSecret: "${secure::anypoint.mq.clientSecret}"
# Queue settings
queues:
orders:
name: "orders-queue"
defaultTtl: 604800000 # 7 days in milliseconds
defaultLockTtl: 120000 # 2 minutes lock
maxDeliveries: 5 # Before DLQ
notifications:
name: "notifications-queue"
defaultTtl: 86400000 # 1 day
defaultLockTtl: 60000 # 1 minute lock
maxDeliveries: 3
high-priority:
name: "high-priority-fifo"
fifo: true
defaultTtl: 86400000
defaultLockTtl: 300000 # 5 minutes for FIFOPublishing Messages
Basic Message Publishing
<!-- Flow: Publish Order to Queue -->
<flow name="publish-order-flow">
<http:listener config-ref="HTTP_Listener" path="/orders" method="POST"/>
<!-- Transform to queue message format -->
<ee:transform doc:name="Prepare Message">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
orderId: payload.id,
customerId: payload.customerId,
items: payload.items,
totalAmount: payload.totalAmount,
timestamp: now() as String {format: "yyyy-MM-dd'T'HH:mm:ss'Z'"}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Publish to Standard Queue -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
doc:name="Publish Order">
<anypoint-mq:properties>
<anypoint-mq:property key="orderType" value="#[payload.orderType]"/>
<anypoint-mq:property key="priority" value="#[payload.priority default 'normal']"/>
<anypoint-mq:property key="correlationId" value="#[correlationId]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
<ee:transform doc:name="Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "queued",
messageId: attributes.messageId,
destination: "orders-queue"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Publishing with Custom Message ID
<!-- Flow: Publish with Deduplication -->
<flow name="publish-idempotent-flow">
<http:listener config-ref="HTTP_Listener" path="/transactions" method="POST"/>
<!-- Generate deterministic message ID for deduplication -->
<ee:transform doc:name="Generate Message ID">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
import dw::Crypto
---
{
data: payload,
messageId: Crypto::hashWith(
(payload.transactionId ++ payload.timestamp) as Binary,
"SHA-256"
)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Publish with custom message ID -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="transactions-fifo"
messageId="#[payload.messageId]"
doc:name="Publish Transaction">
<anypoint-mq:body><![CDATA[#[payload.data]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="transactionType" value="#[payload.data.type]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
</flow>Batch Publishing
<!-- Flow: Batch Publish Multiple Messages -->
<flow name="batch-publish-flow">
<http:listener config-ref="HTTP_Listener" path="/batch/orders" method="POST"/>
<!-- Split array into individual messages -->
<foreach collection="#[payload.orders]">
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
doc:name="Publish Each Order">
<anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="batchId" value="#[vars.batchId]"/>
<anypoint-mq:property key="itemIndex" value="#[vars.counter]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
</foreach>
<ee:transform doc:name="Batch Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "batch_queued",
count: sizeOf(payload.orders),
batchId: vars.batchId
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Consuming Messages
Subscriber Flow (Push Model)
<!-- Flow: Subscribe to Queue -->
<flow name="order-processor-flow">
<!-- Anypoint MQ Subscriber (Push) -->
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
doc:name="Subscribe Orders"
acknowledgementMode="MANUAL"
acknowledgementTimeout="120000"
pollingTime="1000"
maxRedelivery="5"/>
<logger level="INFO" message="Processing order: #[payload.orderId]"/>
<!-- Process the order -->
<try doc:name="Process with Error Handling">
<flow-ref name="process-order-subflow"/>
<!-- Acknowledge on success -->
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge Message"/>
<error-handler>
<on-error-propagate type="ANY">
<logger level="ERROR" message="Order processing failed: #[error.description]"/>
<!-- Message will be redelivered or sent to DLQ -->
</on-error-propagate>
</error-handler>
</try>
</flow>
<sub-flow name="process-order-subflow">
<!-- Validate order -->
<ee:transform doc:name="Validate">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload update {
case .validated -> true
case .processedAt -> now()
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Store in database -->
<db:insert config-ref="Database_Config" doc:name="Insert Order">
<db:sql>
INSERT INTO orders (order_id, customer_id, total, status, created_at)
VALUES (:orderId, :customerId, :total, 'PROCESSED', :createdAt)
</db:sql>
<db:input-parameters><![CDATA[#[{
orderId: payload.orderId,
customerId: payload.customerId,
total: payload.totalAmount,
createdAt: now()
}]]]></db:input-parameters>
</db:insert>
</sub-flow>Consume Operation (Pull Model)
<!-- Flow: On-Demand Message Consumption -->
<flow name="consume-on-demand-flow">
<http:listener config-ref="HTTP_Listener" path="/process/next" method="POST"/>
<!-- Consume single message -->
<anypoint-mq:consume
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
acknowledgementMode="MANUAL"
acknowledgementTimeout="60000"
pollingTime="10000"
doc:name="Consume Next Message"/>
<!-- Check if message received -->
<choice doc:name="Message Received?">
<when expression="#[payload != null]">
<try doc:name="Process Message">
<flow-ref name="process-order-subflow"/>
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge"/>
<ee:transform doc:name="Success Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "processed",
messageId: attributes.messageId
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<error-handler>
<on-error-propagate type="ANY">
<anypoint-mq:nack
config-ref="Anypoint_MQ_Config"
doc:name="Negative Acknowledge"/>
</on-error-propagate>
</error-handler>
</try>
</when>
<otherwise>
<ee:transform doc:name="No Messages">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "empty",
message: "No messages available"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</otherwise>
</choice>
</flow>Message Exchanges (Pub/Sub)
Exchange Configuration
<!-- Exchange Publisher -->
<flow name="notification-publisher-flow">
<http:listener config-ref="HTTP_Listener" path="/events" method="POST"/>
<!-- Publish to Exchange (broadcasts to all bound queues) -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="notifications-exchange"
doc:name="Publish to Exchange">
<anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="eventType" value="#[payload.type]"/>
<anypoint-mq:property key="source" value="#[payload.source]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
</flow>
<!-- Subscriber 1: Email Notifications -->
<flow name="email-notification-subscriber">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="email-notifications-queue"
doc:name="Email Queue Subscriber"/>
<flow-ref name="send-email-notification"/>
<anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>
<!-- Subscriber 2: SMS Notifications -->
<flow name="sms-notification-subscriber">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="sms-notifications-queue"
doc:name="SMS Queue Subscriber"/>
<flow-ref name="send-sms-notification"/>
<anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>
<!-- Subscriber 3: Push Notifications -->
<flow name="push-notification-subscriber">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="push-notifications-queue"
doc:name="Push Queue Subscriber"/>
<flow-ref name="send-push-notification"/>
<anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>FIFO Queues
FIFO Queue Configuration and Usage
<!-- FIFO Queue Publisher with Message Groups -->
<flow name="fifo-order-publisher">
<http:listener config-ref="HTTP_Listener" path="/fifo/orders" method="POST"/>
<!-- FIFO requires message group ID for ordering -->
<ee:transform doc:name="Prepare FIFO Message">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
order: payload,
// Group by customer for ordered processing per customer
messageGroupId: payload.customerId
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="orders-fifo"
messageGroupId="#[payload.messageGroupId]"
doc:name="Publish FIFO Order">
<anypoint-mq:body><![CDATA[#[payload.order]]]></anypoint-mq:body>
</anypoint-mq:publish>
</flow>
<!-- FIFO Queue Consumer -->
<flow name="fifo-order-consumer">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="orders-fifo"
acknowledgementMode="MANUAL"
acknowledgementTimeout="300000"
doc:name="FIFO Subscriber"/>
<logger level="INFO"
message="Processing FIFO order: #[payload.orderId] from group: #[attributes.messageGroupId]"/>
<try doc:name="FIFO Processing">
<!-- FIFO processing - order matters! -->
<flow-ref name="process-fifo-order"/>
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge FIFO"/>
<error-handler>
<on-error-propagate type="ANY">
<logger level="ERROR"
message="FIFO processing failed - blocking group: #[attributes.messageGroupId]"/>
<!-- In FIFO, failure blocks the message group -->
</on-error-propagate>
</error-handler>
</try>
</flow>Dead Letter Queue Handling
DLQ Processing Flow
<!-- Dead Letter Queue Processor -->
<flow name="dlq-processor-flow">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="orders-queue-dlq"
acknowledgementMode="MANUAL"
doc:name="DLQ Subscriber"/>
<logger level="WARN"
message="Processing DLQ message: #[attributes.messageId], Delivery count: #[attributes.deliveryCount]"/>
<!-- Analyze failure reason -->
<ee:transform doc:name="Analyze Failure">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
originalMessage: payload,
messageId: attributes.messageId,
deliveryCount: attributes.deliveryCount,
firstDeliveryTime: attributes.timestamp,
properties: attributes.properties,
failureAnalysis: {
possibleCauses: [
"Validation error",
"External service unavailable",
"Data corruption",
"Processing timeout"
]
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Store in error database for analysis -->
<db:insert config-ref="Database_Config" doc:name="Log DLQ Message">
<db:sql>
INSERT INTO dlq_messages (
message_id, queue_name, payload, delivery_count,
first_delivery, logged_at, status
) VALUES (
:messageId, 'orders-queue', :payload, :deliveryCount,
:firstDelivery, CURRENT_TIMESTAMP, 'PENDING_REVIEW'
)
</db:sql>
<db:input-parameters><![CDATA[#[{
messageId: payload.messageId,
payload: write(payload.originalMessage, "application/json"),
deliveryCount: payload.deliveryCount,
firstDelivery: payload.firstDeliveryTime
}]]]></db:input-parameters>
</db:insert>
<!-- Send alert -->
<flow-ref name="send-dlq-alert"/>
<!-- Acknowledge DLQ message -->
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge DLQ Message"/>
</flow>
<!-- DLQ Reprocess Flow (Manual Trigger) -->
<flow name="dlq-reprocess-flow">
<http:listener config-ref="HTTP_Listener" path="/dlq/reprocess/{messageId}" method="POST"/>
<!-- Fetch from error database -->
<db:select config-ref="Database_Config" doc:name="Get DLQ Message">
<db:sql>
SELECT * FROM dlq_messages WHERE message_id = :messageId AND status = 'PENDING_REVIEW'
</db:sql>
<db:input-parameters><![CDATA[#[{messageId: attributes.uriParams.messageId}]]]></db:input-parameters>
</db:select>
<choice doc:name="Message Found?">
<when expression="#[sizeOf(payload) > 0]">
<!-- Republish to original queue -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
doc:name="Republish Message">
<anypoint-mq:body><![CDATA[#[read(payload[0].payload, "application/json")]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="reprocessed" value="true"/>
<anypoint-mq:property key="originalMessageId" value="#[payload[0].message_id]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
<!-- Update database status -->
<db:update config-ref="Database_Config" doc:name="Update Status">
<db:sql>UPDATE dlq_messages SET status = 'REPROCESSED' WHERE message_id = :messageId</db:sql>
<db:input-parameters><![CDATA[#[{messageId: attributes.uriParams.messageId}]]]></db:input-parameters>
</db:update>
</when>
<otherwise>
<raise-error type="APP:NOT_FOUND" description="DLQ message not found"/>
</otherwise>
</choice>
</flow>Advanced Patterns
Circuit Breaker with Queue
<!-- Circuit Breaker Pattern Using Anypoint MQ -->
<flow name="circuit-breaker-queue-flow">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="external-api-requests"
acknowledgementMode="MANUAL"
doc:name="API Request Subscriber"/>
<!-- Check circuit breaker state -->
<os:retrieve
key="circuit:external-api:state"
objectStore="Circuit_Breaker_Store"
doc:name="Get Circuit State">
<os:default-value><![CDATA[#["CLOSED"]]]></os:default-value>
</os:retrieve>
<choice doc:name="Circuit State">
<when expression="#[payload == 'OPEN']">
<!-- Circuit is open - reject immediately -->
<logger level="WARN" message="Circuit OPEN - delaying message"/>
<!-- Negative ack to retry later -->
<anypoint-mq:nack
config-ref="Anypoint_MQ_Config"
doc:name="NACK - Circuit Open"/>
</when>
<otherwise>
<!-- Circuit closed or half-open - try request -->
<try doc:name="Try External API">
<http:request
config-ref="External_API_Config"
method="POST"
path="/api/process"
doc:name="Call External API"/>
<!-- Success - reset failure count -->
<os:store
key="circuit:external-api:failures"
objectStore="Circuit_Breaker_Store"
doc:name="Reset Failures">
<os:value><![CDATA[#[0]]]></os:value>
</os:store>
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge Success"/>
<error-handler>
<on-error-continue type="HTTP:CONNECTIVITY, HTTP:TIMEOUT">
<!-- Increment failure count -->
<os:retrieve
key="circuit:external-api:failures"
objectStore="Circuit_Breaker_Store"
target="failureCount">
<os:default-value><![CDATA[#[0]]]></os:default-value>
</os:retrieve>
<os:store
key="circuit:external-api:failures"
objectStore="Circuit_Breaker_Store">
<os:value><![CDATA[#[vars.failureCount + 1]]]></os:value>
</os:store>
<!-- Open circuit if threshold exceeded -->
<choice>
<when expression="#[vars.failureCount >= 5]">
<os:store
key="circuit:external-api:state"
objectStore="Circuit_Breaker_Store">
<os:value><![CDATA[#["OPEN"]]]></os:value>
</os:store>
<logger level="ERROR" message="Circuit OPENED after 5 failures"/>
</when>
</choice>
<!-- NACK for retry -->
<anypoint-mq:nack config-ref="Anypoint_MQ_Config"/>
</on-error-continue>
</error-handler>
</try>
</otherwise>
</choice>
</flow>
<!-- Circuit Breaker Reset Scheduler -->
<flow name="circuit-breaker-reset-flow">
<scheduler doc:name="Every 30 seconds">
<scheduling-strategy>
<fixed-frequency frequency="30" timeUnit="SECONDS"/>
</scheduling-strategy>
</scheduler>
<os:retrieve
key="circuit:external-api:state"
objectStore="Circuit_Breaker_Store"
target="currentState">
<os:default-value><![CDATA[#["CLOSED"]]]></os:default-value>
</os:retrieve>
<choice>
<when expression="#[vars.currentState == 'OPEN']">
<!-- Move to half-open for testing -->
<os:store
key="circuit:external-api:state"
objectStore="Circuit_Breaker_Store">
<os:value><![CDATA[#["HALF_OPEN"]]]></os:value>
</os:store>
<logger level="INFO" message="Circuit moved to HALF_OPEN"/>
</when>
</choice>
</flow>Request-Reply Pattern
<!-- Request-Reply Pattern -->
<flow name="request-reply-sender">
<http:listener config-ref="HTTP_Listener" path="/sync/process" method="POST"/>
<!-- Generate correlation ID -->
<set-variable variableName="correlationId" value="#[uuid()]"/>
<!-- Create temporary reply queue name -->
<set-variable variableName="replyQueue" value="#['reply-' ++ vars.correlationId]"/>
<!-- Publish request with reply-to -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="processing-requests"
doc:name="Publish Request">
<anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="correlationId" value="#[vars.correlationId]"/>
<anypoint-mq:property key="replyTo" value="#[vars.replyQueue]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
<!-- Wait for reply (with timeout) -->
<anypoint-mq:consume
config-ref="Anypoint_MQ_Config"
destination="#[vars.replyQueue]"
acknowledgementMode="IMMEDIATE"
pollingTime="30000"
doc:name="Wait for Reply"/>
<choice doc:name="Reply Received?">
<when expression="#[payload != null]">
<logger level="INFO" message="Reply received for: #[vars.correlationId]"/>
</when>
<otherwise>
<raise-error type="APP:TIMEOUT" description="Request timed out waiting for reply"/>
</otherwise>
</choice>
</flow>
<!-- Request Processor (Replier) -->
<flow name="request-reply-processor">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="processing-requests"
acknowledgementMode="MANUAL"
doc:name="Request Subscriber"/>
<!-- Extract reply-to queue -->
<set-variable variableName="replyQueue" value="#[attributes.properties.'replyTo']"/>
<set-variable variableName="correlationId" value="#[attributes.properties.'correlationId']"/>
<try doc:name="Process and Reply">
<!-- Process the request -->
<ee:transform doc:name="Process">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
result: "Processed successfully",
originalData: payload,
processedAt: now()
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Send reply -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="#[vars.replyQueue]"
doc:name="Send Reply">
<anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="correlationId" value="#[vars.correlationId]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
<anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
<error-handler>
<on-error-propagate type="ANY">
<!-- Send error reply -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="#[vars.replyQueue]">
<anypoint-mq:body><![CDATA[#[{error: error.description}]]]></anypoint-mq:body>
</anypoint-mq:publish>
</on-error-propagate>
</error-handler>
</try>
</flow>Monitoring and Operations
Queue Statistics API
<!-- Queue Statistics Endpoint -->
<flow name="queue-stats-flow">
<http:listener config-ref="HTTP_Listener" path="/admin/queues/stats" method="GET"/>
<ee:transform doc:name="Prepare Stats Request">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
queues: [
"orders-queue",
"notifications-queue",
"orders-queue-dlq"
]
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Get stats for each queue using Anypoint MQ Admin API -->
<foreach collection="#[payload.queues]">
<http:request
method="GET"
url="${anypoint.mq.admin.url}/queues/#[payload]"
config-ref="Anypoint_MQ_Admin_Config"
doc:name="Get Queue Stats">
<http:headers>
<http:header key="Authorization" value="#['Bearer ' ++ vars.accessToken]"/>
</http:headers>
</http:request>
<ee:transform doc:name="Map Stats">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
queueName: payload.queueId,
messagesInFlight: payload.messagesInFlight,
messagesVisible: payload.messagesVisible,
messagesSent: payload.messagesSent,
messagesReceived: payload.messagesReceived,
messagesAcked: payload.messagesAcked
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</foreach>
</flow>Health Check Flow
<!-- Anypoint MQ Health Check -->
<flow name="mq-health-check-flow">
<http:listener config-ref="HTTP_Listener" path="/health/mq" method="GET"/>
<try doc:name="MQ Health Check">
<!-- Try to publish and consume a test message -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="health-check-queue"
doc:name="Publish Test">
<anypoint-mq:body><![CDATA[#[{test: true, timestamp: now()}]]]></anypoint-mq:body>
</anypoint-mq:publish>
<anypoint-mq:consume
config-ref="Anypoint_MQ_Config"
destination="health-check-queue"
acknowledgementMode="IMMEDIATE"
pollingTime="5000"
doc:name="Consume Test"/>
<ee:transform doc:name="Healthy Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "healthy",
service: "Anypoint MQ",
timestamp: now(),
latencyMs: (now() - vars.startTime) as Number {unit: "milliseconds"}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<error-handler>
<on-error-continue type="ANY">
<ee:transform doc:name="Unhealthy Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "unhealthy",
service: "Anypoint MQ",
error: error.description,
timestamp: now()
}]]></ee:set-payload>
</ee:message>
<ee:set-attributes>
<ee:set-attribute attributeName="httpStatus">503</ee:set-attribute>
</ee:set-attributes>
</ee:transform>
</on-error-continue>
</error-handler>
</try>
</flow>Best Practices Summary
anypoint_mq_best_practices:
queue_design:
- "Use descriptive queue names with environment prefix"
- "Configure appropriate TTL based on business requirements"
- "Always set up DLQ for production queues"
- "Use FIFO queues only when ordering is critical (lower throughput)"
- "Use exchanges for broadcast/multicast scenarios"
message_handling:
- "Use MANUAL acknowledgment for reliable processing"
- "Set appropriate acknowledgment timeout (2x processing time)"
- "Include correlation ID for traceability"
- "Add metadata as message properties, not in body"
- "Implement idempotent consumers for at-least-once delivery"
error_handling:
- "Set maxDeliveries before DLQ (typically 3-5)"
- "Monitor DLQ regularly with alerts"
- "Build DLQ reprocessing capability"
- "Log failed messages with full context"
performance:
- "Use batch publishing for high-volume scenarios"
- "Configure connection pooling appropriately"
- "Monitor queue depth and processing latency"
- "Scale consumers based on queue backlog"
security:
- "Use environment-specific client credentials"
- "Store credentials in secure properties"
- "Use VPC/private network for sensitive data"
- "Encrypt message payloads containing PII"Conclusion
Anypoint MQ provides reliable cloud messaging for MuleSoft integrations. Key patterns include pub/sub with exchanges, FIFO for ordering, DLQ for failures, and request-reply for synchronous needs. Proper acknowledgment handling and monitoring ensure production reliability.