MuleSoft

MuleSoft Anypoint MQ Complete Guide: Messaging Made Simple

DeviDevs Team
11 min read
#mulesoft#anypoint-mq#messaging#queues#enterprise-integration

Anypoint MQ is MuleSoft's cloud messaging service for reliable asynchronous communication. This guide covers queue configuration, messaging patterns, and production best practices.

Anypoint MQ Architecture

Core Components Overview

┌─────────────────────────────────────────────────────────────────────┐
│                     ANYPOINT MQ ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   PUBLISHERS                        ANYPOINT MQ                     │
│  ┌─────────────┐                  ┌─────────────────────────────┐  │
│  │   Mule App  │───publish───────►│                             │  │
│  └─────────────┘                  │   ┌─────────────────────┐   │  │
│  ┌─────────────┐                  │   │    Standard Queue   │   │  │
│  │  REST API   │───publish───────►│   │  (at-least-once)    │   │  │
│  └─────────────┘                  │   └─────────────────────┘   │  │
│  ┌─────────────┐                  │                             │  │
│  │   External  │───publish───────►│   ┌─────────────────────┐   │  │
│  │   System    │                  │   │     FIFO Queue      │   │  │
│  └─────────────┘                  │   │  (exactly-once)     │   │  │
│                                   │   └─────────────────────┘   │  │
│   SUBSCRIBERS                     │                             │  │
│  ┌─────────────┐                  │   ┌─────────────────────┐   │  │
│  │   Mule App  │◄──subscribe──────│   │   Message Exchange  │   │  │
│  └─────────────┘                  │   │   (pub/sub topics)  │   │  │
│  ┌─────────────┐                  │   └─────────────────────┘   │  │
│  │   Mule App  │◄──subscribe──────│                             │  │
│  └─────────────┘                  │   ┌─────────────────────┐   │  │
│                                   │   │  Dead Letter Queue  │   │  │
│                                   │   │   (failed messages) │   │  │
│                                   │   └─────────────────────┘   │  │
│                                   └─────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────┘

Queue Configuration

Standard Queue Setup

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
        http://www.mulesoft.org/schema/mule/core
        http://www.mulesoft.org/schema/mule/core/current/mule.xsd
        http://www.mulesoft.org/schema/mule/anypoint-mq
        http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd">
 
    <!-- Anypoint MQ Configuration -->
    <anypoint-mq:config name="Anypoint_MQ_Config" doc:name="Anypoint MQ Config">
        <anypoint-mq:connection
            url="${anypoint.mq.url}"
            clientId="${anypoint.mq.clientId}"
            clientSecret="${anypoint.mq.clientSecret}"/>
    </anypoint-mq:config>
 
    <!-- Alternative: Default Subscriber Configuration -->
    <anypoint-mq:default-subscriber-config name="MQ_Subscriber_Config">
        <anypoint-mq:connection
            url="${anypoint.mq.url}"
            clientId="${anypoint.mq.clientId}"
            clientSecret="${anypoint.mq.clientSecret}"/>
    </anypoint-mq:default-subscriber-config>
 
</mule>

Queue Properties Configuration

# mule-artifact.json - Queue properties
anypoint:
  mq:
    # Region-specific URLs
    url: "https://mq-us-east-1.anypoint.mulesoft.com/api/v1"
    # url: "https://mq-eu-west-1.anypoint.mulesoft.com/api/v1"
    # url: "https://mq-ap-southeast-1.anypoint.mulesoft.com/api/v1"
 
    clientId: "${secure::anypoint.mq.clientId}"
    clientSecret: "${secure::anypoint.mq.clientSecret}"
 
    # Queue settings
    queues:
      orders:
        name: "orders-queue"
        defaultTtl: 604800000        # 7 days in milliseconds
        defaultLockTtl: 120000       # 2 minutes lock
        maxDeliveries: 5             # Before DLQ
 
      notifications:
        name: "notifications-queue"
        defaultTtl: 86400000         # 1 day
        defaultLockTtl: 60000        # 1 minute lock
        maxDeliveries: 3
 
      high-priority:
        name: "high-priority-fifo"
        fifo: true
        defaultTtl: 86400000
        defaultLockTtl: 300000       # 5 minutes for FIFO

Publishing Messages

Basic Message Publishing

<!-- Flow: Publish Order to Queue -->
<flow name="publish-order-flow">
    <http:listener config-ref="HTTP_Listener" path="/orders" method="POST"/>
 
    <!-- Transform to queue message format -->
    <ee:transform doc:name="Prepare Message">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    orderId: payload.id,
    customerId: payload.customerId,
    items: payload.items,
    totalAmount: payload.totalAmount,
    timestamp: now() as String {format: "yyyy-MM-dd'T'HH:mm:ss'Z'"}
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Publish to Standard Queue -->
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="orders-queue"
        doc:name="Publish Order">
        <anypoint-mq:properties>
            <anypoint-mq:property key="orderType" value="#[payload.orderType]"/>
            <anypoint-mq:property key="priority" value="#[payload.priority default 'normal']"/>
            <anypoint-mq:property key="correlationId" value="#[correlationId]"/>
        </anypoint-mq:properties>
    </anypoint-mq:publish>
 
    <ee:transform doc:name="Response">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "queued",
    messageId: attributes.messageId,
    destination: "orders-queue"
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
</flow>

Publishing with Custom Message ID

<!-- Flow: Publish with Deduplication -->
<flow name="publish-idempotent-flow">
    <http:listener config-ref="HTTP_Listener" path="/transactions" method="POST"/>
 
    <!-- Generate deterministic message ID for deduplication -->
    <ee:transform doc:name="Generate Message ID">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
import dw::Crypto
---
{
    data: payload,
    messageId: Crypto::hashWith(
        (payload.transactionId ++ payload.timestamp) as Binary,
        "SHA-256"
    )
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Publish with custom message ID -->
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="transactions-fifo"
        messageId="#[payload.messageId]"
        doc:name="Publish Transaction">
        <anypoint-mq:body><![CDATA[#[payload.data]]]></anypoint-mq:body>
        <anypoint-mq:properties>
            <anypoint-mq:property key="transactionType" value="#[payload.data.type]"/>
        </anypoint-mq:properties>
    </anypoint-mq:publish>
 
</flow>

Batch Publishing

<!-- Flow: Batch Publish Multiple Messages -->
<flow name="batch-publish-flow">
    <http:listener config-ref="HTTP_Listener" path="/batch/orders" method="POST"/>
 
    <!-- Split array into individual messages -->
    <foreach collection="#[payload.orders]">
        <anypoint-mq:publish
            config-ref="Anypoint_MQ_Config"
            destination="orders-queue"
            doc:name="Publish Each Order">
            <anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
            <anypoint-mq:properties>
                <anypoint-mq:property key="batchId" value="#[vars.batchId]"/>
                <anypoint-mq:property key="itemIndex" value="#[vars.counter]"/>
            </anypoint-mq:properties>
        </anypoint-mq:publish>
    </foreach>
 
    <ee:transform doc:name="Batch Response">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "batch_queued",
    count: sizeOf(payload.orders),
    batchId: vars.batchId
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
</flow>

Consuming Messages

Subscriber Flow (Push Model)

<!-- Flow: Subscribe to Queue -->
<flow name="order-processor-flow">
    <!-- Anypoint MQ Subscriber (Push) -->
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="orders-queue"
        doc:name="Subscribe Orders"
        acknowledgementMode="MANUAL"
        acknowledgementTimeout="120000"
        pollingTime="1000"
        maxRedelivery="5"/>
 
    <logger level="INFO" message="Processing order: #[payload.orderId]"/>
 
    <!-- Process the order -->
    <try doc:name="Process with Error Handling">
        <flow-ref name="process-order-subflow"/>
 
        <!-- Acknowledge on success -->
        <anypoint-mq:ack
            config-ref="Anypoint_MQ_Config"
            doc:name="Acknowledge Message"/>
 
        <error-handler>
            <on-error-propagate type="ANY">
                <logger level="ERROR" message="Order processing failed: #[error.description]"/>
                <!-- Message will be redelivered or sent to DLQ -->
            </on-error-propagate>
        </error-handler>
    </try>
 
</flow>
 
<sub-flow name="process-order-subflow">
    <!-- Validate order -->
    <ee:transform doc:name="Validate">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload update {
    case .validated -> true
    case .processedAt -> now()
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Store in database -->
    <db:insert config-ref="Database_Config" doc:name="Insert Order">
        <db:sql>
            INSERT INTO orders (order_id, customer_id, total, status, created_at)
            VALUES (:orderId, :customerId, :total, 'PROCESSED', :createdAt)
        </db:sql>
        <db:input-parameters><![CDATA[#[{
            orderId: payload.orderId,
            customerId: payload.customerId,
            total: payload.totalAmount,
            createdAt: now()
        }]]]></db:input-parameters>
    </db:insert>
</sub-flow>

Consume Operation (Pull Model)

<!-- Flow: On-Demand Message Consumption -->
<flow name="consume-on-demand-flow">
    <http:listener config-ref="HTTP_Listener" path="/process/next" method="POST"/>
 
    <!-- Consume single message -->
    <anypoint-mq:consume
        config-ref="Anypoint_MQ_Config"
        destination="orders-queue"
        acknowledgementMode="MANUAL"
        acknowledgementTimeout="60000"
        pollingTime="10000"
        doc:name="Consume Next Message"/>
 
    <!-- Check if message received -->
    <choice doc:name="Message Received?">
        <when expression="#[payload != null]">
            <try doc:name="Process Message">
                <flow-ref name="process-order-subflow"/>
 
                <anypoint-mq:ack
                    config-ref="Anypoint_MQ_Config"
                    doc:name="Acknowledge"/>
 
                <ee:transform doc:name="Success Response">
                    <ee:message>
                        <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "processed",
    messageId: attributes.messageId
}]]></ee:set-payload>
                    </ee:message>
                </ee:transform>
 
                <error-handler>
                    <on-error-propagate type="ANY">
                        <anypoint-mq:nack
                            config-ref="Anypoint_MQ_Config"
                            doc:name="Negative Acknowledge"/>
                    </on-error-propagate>
                </error-handler>
            </try>
        </when>
        <otherwise>
            <ee:transform doc:name="No Messages">
                <ee:message>
                    <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "empty",
    message: "No messages available"
}]]></ee:set-payload>
                </ee:message>
            </ee:transform>
        </otherwise>
    </choice>
 
</flow>

Message Exchanges (Pub/Sub)

Exchange Configuration

<!-- Exchange Publisher -->
<flow name="notification-publisher-flow">
    <http:listener config-ref="HTTP_Listener" path="/events" method="POST"/>
 
    <!-- Publish to Exchange (broadcasts to all bound queues) -->
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="notifications-exchange"
        doc:name="Publish to Exchange">
        <anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
        <anypoint-mq:properties>
            <anypoint-mq:property key="eventType" value="#[payload.type]"/>
            <anypoint-mq:property key="source" value="#[payload.source]"/>
        </anypoint-mq:properties>
    </anypoint-mq:publish>
 
</flow>
 
<!-- Subscriber 1: Email Notifications -->
<flow name="email-notification-subscriber">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="email-notifications-queue"
        doc:name="Email Queue Subscriber"/>
 
    <flow-ref name="send-email-notification"/>
 
    <anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>
 
<!-- Subscriber 2: SMS Notifications -->
<flow name="sms-notification-subscriber">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="sms-notifications-queue"
        doc:name="SMS Queue Subscriber"/>
 
    <flow-ref name="send-sms-notification"/>
 
    <anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>
 
<!-- Subscriber 3: Push Notifications -->
<flow name="push-notification-subscriber">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="push-notifications-queue"
        doc:name="Push Queue Subscriber"/>
 
    <flow-ref name="send-push-notification"/>
 
    <anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>

FIFO Queues

FIFO Queue Configuration and Usage

<!-- FIFO Queue Publisher with Message Groups -->
<flow name="fifo-order-publisher">
    <http:listener config-ref="HTTP_Listener" path="/fifo/orders" method="POST"/>
 
    <!-- FIFO requires message group ID for ordering -->
    <ee:transform doc:name="Prepare FIFO Message">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    order: payload,
    // Group by customer for ordered processing per customer
    messageGroupId: payload.customerId
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="orders-fifo"
        messageGroupId="#[payload.messageGroupId]"
        doc:name="Publish FIFO Order">
        <anypoint-mq:body><![CDATA[#[payload.order]]]></anypoint-mq:body>
    </anypoint-mq:publish>
 
</flow>
 
<!-- FIFO Queue Consumer -->
<flow name="fifo-order-consumer">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="orders-fifo"
        acknowledgementMode="MANUAL"
        acknowledgementTimeout="300000"
        doc:name="FIFO Subscriber"/>
 
    <logger level="INFO"
        message="Processing FIFO order: #[payload.orderId] from group: #[attributes.messageGroupId]"/>
 
    <try doc:name="FIFO Processing">
        <!-- FIFO processing - order matters! -->
        <flow-ref name="process-fifo-order"/>
 
        <anypoint-mq:ack
            config-ref="Anypoint_MQ_Config"
            doc:name="Acknowledge FIFO"/>
 
        <error-handler>
            <on-error-propagate type="ANY">
                <logger level="ERROR"
                    message="FIFO processing failed - blocking group: #[attributes.messageGroupId]"/>
                <!-- In FIFO, failure blocks the message group -->
            </on-error-propagate>
        </error-handler>
    </try>
 
</flow>

Dead Letter Queue Handling

DLQ Processing Flow

<!-- Dead Letter Queue Processor -->
<flow name="dlq-processor-flow">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="orders-queue-dlq"
        acknowledgementMode="MANUAL"
        doc:name="DLQ Subscriber"/>
 
    <logger level="WARN"
        message="Processing DLQ message: #[attributes.messageId], Delivery count: #[attributes.deliveryCount]"/>
 
    <!-- Analyze failure reason -->
    <ee:transform doc:name="Analyze Failure">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    originalMessage: payload,
    messageId: attributes.messageId,
    deliveryCount: attributes.deliveryCount,
    firstDeliveryTime: attributes.timestamp,
    properties: attributes.properties,
    failureAnalysis: {
        possibleCauses: [
            "Validation error",
            "External service unavailable",
            "Data corruption",
            "Processing timeout"
        ]
    }
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Store in error database for analysis -->
    <db:insert config-ref="Database_Config" doc:name="Log DLQ Message">
        <db:sql>
            INSERT INTO dlq_messages (
                message_id, queue_name, payload, delivery_count,
                first_delivery, logged_at, status
            ) VALUES (
                :messageId, 'orders-queue', :payload, :deliveryCount,
                :firstDelivery, CURRENT_TIMESTAMP, 'PENDING_REVIEW'
            )
        </db:sql>
        <db:input-parameters><![CDATA[#[{
            messageId: payload.messageId,
            payload: write(payload.originalMessage, "application/json"),
            deliveryCount: payload.deliveryCount,
            firstDelivery: payload.firstDeliveryTime
        }]]]></db:input-parameters>
    </db:insert>
 
    <!-- Send alert -->
    <flow-ref name="send-dlq-alert"/>
 
    <!-- Acknowledge DLQ message -->
    <anypoint-mq:ack
        config-ref="Anypoint_MQ_Config"
        doc:name="Acknowledge DLQ Message"/>
 
</flow>
 
<!-- DLQ Reprocess Flow (Manual Trigger) -->
<flow name="dlq-reprocess-flow">
    <http:listener config-ref="HTTP_Listener" path="/dlq/reprocess/{messageId}" method="POST"/>
 
    <!-- Fetch from error database -->
    <db:select config-ref="Database_Config" doc:name="Get DLQ Message">
        <db:sql>
            SELECT * FROM dlq_messages WHERE message_id = :messageId AND status = 'PENDING_REVIEW'
        </db:sql>
        <db:input-parameters><![CDATA[#[{messageId: attributes.uriParams.messageId}]]]></db:input-parameters>
    </db:select>
 
    <choice doc:name="Message Found?">
        <when expression="#[sizeOf(payload) > 0]">
            <!-- Republish to original queue -->
            <anypoint-mq:publish
                config-ref="Anypoint_MQ_Config"
                destination="orders-queue"
                doc:name="Republish Message">
                <anypoint-mq:body><![CDATA[#[read(payload[0].payload, "application/json")]]]></anypoint-mq:body>
                <anypoint-mq:properties>
                    <anypoint-mq:property key="reprocessed" value="true"/>
                    <anypoint-mq:property key="originalMessageId" value="#[payload[0].message_id]"/>
                </anypoint-mq:properties>
            </anypoint-mq:publish>
 
            <!-- Update database status -->
            <db:update config-ref="Database_Config" doc:name="Update Status">
                <db:sql>UPDATE dlq_messages SET status = 'REPROCESSED' WHERE message_id = :messageId</db:sql>
                <db:input-parameters><![CDATA[#[{messageId: attributes.uriParams.messageId}]]]></db:input-parameters>
            </db:update>
        </when>
        <otherwise>
            <raise-error type="APP:NOT_FOUND" description="DLQ message not found"/>
        </otherwise>
    </choice>
 
</flow>

Advanced Patterns

Circuit Breaker with Queue

<!-- Circuit Breaker Pattern Using Anypoint MQ -->
<flow name="circuit-breaker-queue-flow">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="external-api-requests"
        acknowledgementMode="MANUAL"
        doc:name="API Request Subscriber"/>
 
    <!-- Check circuit breaker state -->
    <os:retrieve
        key="circuit:external-api:state"
        objectStore="Circuit_Breaker_Store"
        doc:name="Get Circuit State">
        <os:default-value><![CDATA[#["CLOSED"]]]></os:default-value>
    </os:retrieve>
 
    <choice doc:name="Circuit State">
        <when expression="#[payload == 'OPEN']">
            <!-- Circuit is open - reject immediately -->
            <logger level="WARN" message="Circuit OPEN - delaying message"/>
 
            <!-- Negative ack to retry later -->
            <anypoint-mq:nack
                config-ref="Anypoint_MQ_Config"
                doc:name="NACK - Circuit Open"/>
        </when>
        <otherwise>
            <!-- Circuit closed or half-open - try request -->
            <try doc:name="Try External API">
                <http:request
                    config-ref="External_API_Config"
                    method="POST"
                    path="/api/process"
                    doc:name="Call External API"/>
 
                <!-- Success - reset failure count -->
                <os:store
                    key="circuit:external-api:failures"
                    objectStore="Circuit_Breaker_Store"
                    doc:name="Reset Failures">
                    <os:value><![CDATA[#[0]]]></os:value>
                </os:store>
 
                <anypoint-mq:ack
                    config-ref="Anypoint_MQ_Config"
                    doc:name="Acknowledge Success"/>
 
                <error-handler>
                    <on-error-continue type="HTTP:CONNECTIVITY, HTTP:TIMEOUT">
                        <!-- Increment failure count -->
                        <os:retrieve
                            key="circuit:external-api:failures"
                            objectStore="Circuit_Breaker_Store"
                            target="failureCount">
                            <os:default-value><![CDATA[#[0]]]></os:default-value>
                        </os:retrieve>
 
                        <os:store
                            key="circuit:external-api:failures"
                            objectStore="Circuit_Breaker_Store">
                            <os:value><![CDATA[#[vars.failureCount + 1]]]></os:value>
                        </os:store>
 
                        <!-- Open circuit if threshold exceeded -->
                        <choice>
                            <when expression="#[vars.failureCount >= 5]">
                                <os:store
                                    key="circuit:external-api:state"
                                    objectStore="Circuit_Breaker_Store">
                                    <os:value><![CDATA[#["OPEN"]]]></os:value>
                                </os:store>
                                <logger level="ERROR" message="Circuit OPENED after 5 failures"/>
                            </when>
                        </choice>
 
                        <!-- NACK for retry -->
                        <anypoint-mq:nack config-ref="Anypoint_MQ_Config"/>
                    </on-error-continue>
                </error-handler>
            </try>
        </otherwise>
    </choice>
 
</flow>
 
<!-- Circuit Breaker Reset Scheduler -->
<flow name="circuit-breaker-reset-flow">
    <scheduler doc:name="Every 30 seconds">
        <scheduling-strategy>
            <fixed-frequency frequency="30" timeUnit="SECONDS"/>
        </scheduling-strategy>
    </scheduler>
 
    <os:retrieve
        key="circuit:external-api:state"
        objectStore="Circuit_Breaker_Store"
        target="currentState">
        <os:default-value><![CDATA[#["CLOSED"]]]></os:default-value>
    </os:retrieve>
 
    <choice>
        <when expression="#[vars.currentState == 'OPEN']">
            <!-- Move to half-open for testing -->
            <os:store
                key="circuit:external-api:state"
                objectStore="Circuit_Breaker_Store">
                <os:value><![CDATA[#["HALF_OPEN"]]]></os:value>
            </os:store>
            <logger level="INFO" message="Circuit moved to HALF_OPEN"/>
        </when>
    </choice>
 
</flow>

Request-Reply Pattern

<!-- Request-Reply Pattern -->
<flow name="request-reply-sender">
    <http:listener config-ref="HTTP_Listener" path="/sync/process" method="POST"/>
 
    <!-- Generate correlation ID -->
    <set-variable variableName="correlationId" value="#[uuid()]"/>
 
    <!-- Create temporary reply queue name -->
    <set-variable variableName="replyQueue" value="#['reply-' ++ vars.correlationId]"/>
 
    <!-- Publish request with reply-to -->
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="processing-requests"
        doc:name="Publish Request">
        <anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
        <anypoint-mq:properties>
            <anypoint-mq:property key="correlationId" value="#[vars.correlationId]"/>
            <anypoint-mq:property key="replyTo" value="#[vars.replyQueue]"/>
        </anypoint-mq:properties>
    </anypoint-mq:publish>
 
    <!-- Wait for reply (with timeout) -->
    <anypoint-mq:consume
        config-ref="Anypoint_MQ_Config"
        destination="#[vars.replyQueue]"
        acknowledgementMode="IMMEDIATE"
        pollingTime="30000"
        doc:name="Wait for Reply"/>
 
    <choice doc:name="Reply Received?">
        <when expression="#[payload != null]">
            <logger level="INFO" message="Reply received for: #[vars.correlationId]"/>
        </when>
        <otherwise>
            <raise-error type="APP:TIMEOUT" description="Request timed out waiting for reply"/>
        </otherwise>
    </choice>
 
</flow>
 
<!-- Request Processor (Replier) -->
<flow name="request-reply-processor">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="processing-requests"
        acknowledgementMode="MANUAL"
        doc:name="Request Subscriber"/>
 
    <!-- Extract reply-to queue -->
    <set-variable variableName="replyQueue" value="#[attributes.properties.'replyTo']"/>
    <set-variable variableName="correlationId" value="#[attributes.properties.'correlationId']"/>
 
    <try doc:name="Process and Reply">
        <!-- Process the request -->
        <ee:transform doc:name="Process">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    result: "Processed successfully",
    originalData: payload,
    processedAt: now()
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
 
        <!-- Send reply -->
        <anypoint-mq:publish
            config-ref="Anypoint_MQ_Config"
            destination="#[vars.replyQueue]"
            doc:name="Send Reply">
            <anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
            <anypoint-mq:properties>
                <anypoint-mq:property key="correlationId" value="#[vars.correlationId]"/>
            </anypoint-mq:properties>
        </anypoint-mq:publish>
 
        <anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
 
        <error-handler>
            <on-error-propagate type="ANY">
                <!-- Send error reply -->
                <anypoint-mq:publish
                    config-ref="Anypoint_MQ_Config"
                    destination="#[vars.replyQueue]">
                    <anypoint-mq:body><![CDATA[#[{error: error.description}]]]></anypoint-mq:body>
                </anypoint-mq:publish>
            </on-error-propagate>
        </error-handler>
    </try>
 
</flow>

Monitoring and Operations

Queue Statistics API

<!-- Queue Statistics Endpoint -->
<flow name="queue-stats-flow">
    <http:listener config-ref="HTTP_Listener" path="/admin/queues/stats" method="GET"/>
 
    <ee:transform doc:name="Prepare Stats Request">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    queues: [
        "orders-queue",
        "notifications-queue",
        "orders-queue-dlq"
    ]
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Get stats for each queue using Anypoint MQ Admin API -->
    <foreach collection="#[payload.queues]">
        <http:request
            method="GET"
            url="${anypoint.mq.admin.url}/queues/#[payload]"
            config-ref="Anypoint_MQ_Admin_Config"
            doc:name="Get Queue Stats">
            <http:headers>
                <http:header key="Authorization" value="#['Bearer ' ++ vars.accessToken]"/>
            </http:headers>
        </http:request>
 
        <ee:transform doc:name="Map Stats">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    queueName: payload.queueId,
    messagesInFlight: payload.messagesInFlight,
    messagesVisible: payload.messagesVisible,
    messagesSent: payload.messagesSent,
    messagesReceived: payload.messagesReceived,
    messagesAcked: payload.messagesAcked
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
    </foreach>
 
</flow>

Health Check Flow

<!-- Anypoint MQ Health Check -->
<flow name="mq-health-check-flow">
    <http:listener config-ref="HTTP_Listener" path="/health/mq" method="GET"/>
 
    <try doc:name="MQ Health Check">
        <!-- Try to publish and consume a test message -->
        <anypoint-mq:publish
            config-ref="Anypoint_MQ_Config"
            destination="health-check-queue"
            doc:name="Publish Test">
            <anypoint-mq:body><![CDATA[#[{test: true, timestamp: now()}]]]></anypoint-mq:body>
        </anypoint-mq:publish>
 
        <anypoint-mq:consume
            config-ref="Anypoint_MQ_Config"
            destination="health-check-queue"
            acknowledgementMode="IMMEDIATE"
            pollingTime="5000"
            doc:name="Consume Test"/>
 
        <ee:transform doc:name="Healthy Response">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "healthy",
    service: "Anypoint MQ",
    timestamp: now(),
    latencyMs: (now() - vars.startTime) as Number {unit: "milliseconds"}
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
 
        <error-handler>
            <on-error-continue type="ANY">
                <ee:transform doc:name="Unhealthy Response">
                    <ee:message>
                        <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "unhealthy",
    service: "Anypoint MQ",
    error: error.description,
    timestamp: now()
}]]></ee:set-payload>
                    </ee:message>
                    <ee:set-attributes>
                        <ee:set-attribute attributeName="httpStatus">503</ee:set-attribute>
                    </ee:set-attributes>
                </ee:transform>
            </on-error-continue>
        </error-handler>
    </try>
 
</flow>

Best Practices Summary

anypoint_mq_best_practices:
  queue_design:
    - "Use descriptive queue names with environment prefix"
    - "Configure appropriate TTL based on business requirements"
    - "Always set up DLQ for production queues"
    - "Use FIFO queues only when ordering is critical (lower throughput)"
    - "Use exchanges for broadcast/multicast scenarios"
 
  message_handling:
    - "Use MANUAL acknowledgment for reliable processing"
    - "Set appropriate acknowledgment timeout (2x processing time)"
    - "Include correlation ID for traceability"
    - "Add metadata as message properties, not in body"
    - "Implement idempotent consumers for at-least-once delivery"
 
  error_handling:
    - "Set maxDeliveries before DLQ (typically 3-5)"
    - "Monitor DLQ regularly with alerts"
    - "Build DLQ reprocessing capability"
    - "Log failed messages with full context"
 
  performance:
    - "Use batch publishing for high-volume scenarios"
    - "Configure connection pooling appropriately"
    - "Monitor queue depth and processing latency"
    - "Scale consumers based on queue backlog"
 
  security:
    - "Use environment-specific client credentials"
    - "Store credentials in secure properties"
    - "Use VPC/private network for sensitive data"
    - "Encrypt message payloads containing PII"

Conclusion

Anypoint MQ provides reliable cloud messaging for MuleSoft integrations. Key patterns include pub/sub with exchanges, FIFO for ordering, DLQ for failures, and request-reply for synchronous needs. Proper acknowledgment handling and monitoring ensure production reliability.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.