MuleSoft

Ghid complet MuleSoft Anypoint MQ: Messaging simplificat

Petru Constantin
--12 min lectura
#mulesoft#anypoint-mq#messaging#queues#enterprise-integration

Anypoint MQ e serviciul de messaging cloud al MuleSoft pentru comunicare asincrona fiabila. Acest ghid acopera configurarea cozilor, pattern-uri de messaging si bune practici pentru productie.

Arhitectura Anypoint MQ

Prezentarea componentelor principale

┌─────────────────────────────────────────────────────────────────────┐
│                     ANYPOINT MQ ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   PUBLISHERS                        ANYPOINT MQ                     │
│  ┌─────────────┐                  ┌─────────────────────────────┐  │
│  │   Mule App  │───publish───────►│                             │  │
│  └─────────────┘                  │   ┌─────────────────────┐   │  │
│  ┌─────────────┐                  │   │    Standard Queue   │   │  │
│  │  REST API   │───publish───────►│   │  (at-least-once)    │   │  │
│  └─────────────┘                  │   └─────────────────────┘   │  │
│  ┌─────────────┐                  │                             │  │
│  │   External  │───publish───────►│   ┌─────────────────────┐   │  │
│  │   System    │                  │   │     FIFO Queue      │   │  │
│  └─────────────┘                  │   │  (exactly-once)     │   │  │
│                                   │   └─────────────────────┘   │  │
│   SUBSCRIBERS                     │                             │  │
│  ┌─────────────┐                  │   ┌─────────────────────┐   │  │
│  │   Mule App  │◄──subscribe──────│   │   Message Exchange  │   │  │
│  └─────────────┘                  │   │   (pub/sub topics)  │   │  │
│  ┌─────────────┐                  │   └─────────────────────┘   │  │
│  │   Mule App  │◄──subscribe──────│                             │  │
│  └─────────────┘                  │   ┌─────────────────────┐   │  │
│                                   │   │  Dead Letter Queue  │   │  │
│                                   │   │   (failed messages) │   │  │
│                                   │   └─────────────────────┘   │  │
│                                   └─────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────┘

Configurarea cozilor

Setup coada standard

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
        http://www.mulesoft.org/schema/mule/core
        http://www.mulesoft.org/schema/mule/core/current/mule.xsd
        http://www.mulesoft.org/schema/mule/anypoint-mq
        http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd">
 
    <!-- Anypoint MQ Configuration -->
    <anypoint-mq:config name="Anypoint_MQ_Config" doc:name="Anypoint MQ Config">
        <anypoint-mq:connection
            url="${anypoint.mq.url}"
            clientId="${anypoint.mq.clientId}"
            clientSecret="${anypoint.mq.clientSecret}"/>
    </anypoint-mq:config>
 
    <!-- Alternative: Default Subscriber Configuration -->
    <anypoint-mq:default-subscriber-config name="MQ_Subscriber_Config">
        <anypoint-mq:connection
            url="${anypoint.mq.url}"
            clientId="${anypoint.mq.clientId}"
            clientSecret="${anypoint.mq.clientSecret}"/>
    </anypoint-mq:default-subscriber-config>
 
</mule>

Configurarea proprietatilor cozii

# mule-artifact.json - Queue properties
anypoint:
  mq:
    # URL-uri specifice regiunii
    url: "https://mq-us-east-1.anypoint.mulesoft.com/api/v1"
    # url: "https://mq-eu-west-1.anypoint.mulesoft.com/api/v1"
    # url: "https://mq-ap-southeast-1.anypoint.mulesoft.com/api/v1"
 
    clientId: "${secure::anypoint.mq.clientId}"
    clientSecret: "${secure::anypoint.mq.clientSecret}"
 
    # Setari coada
    queues:
      orders:
        name: "orders-queue"
        defaultTtl: 604800000        # 7 zile in milisecunde
        defaultLockTtl: 120000       # 2 minute lock
        maxDeliveries: 5             # Inainte de DLQ
 
      notifications:
        name: "notifications-queue"
        defaultTtl: 86400000         # 1 zi
        defaultLockTtl: 60000        # 1 minut lock
        maxDeliveries: 3
 
      high-priority:
        name: "high-priority-fifo"
        fifo: true
        defaultTtl: 86400000
        defaultLockTtl: 300000       # 5 minute pentru FIFO

Publicarea mesajelor

Publicare simpla de mesaje

<!-- Flow: Publica comanda in coada -->
<flow name="publish-order-flow">
    <http:listener config-ref="HTTP_Listener" path="/orders" method="POST"/>
 
    <!-- Transforma in formatul mesajului de coada -->
    <ee:transform doc:name="Prepare Message">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    orderId: payload.id,
    customerId: payload.customerId,
    items: payload.items,
    totalAmount: payload.totalAmount,
    timestamp: now() as String {format: "yyyy-MM-dd'T'HH:mm:ss'Z'"}
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Publica in coada standard -->
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="orders-queue"
        doc:name="Publish Order">
        <anypoint-mq:properties>
            <anypoint-mq:property key="orderType" value="#[payload.orderType]"/>
            <anypoint-mq:property key="priority" value="#[payload.priority default 'normal']"/>
            <anypoint-mq:property key="correlationId" value="#[correlationId]"/>
        </anypoint-mq:properties>
    </anypoint-mq:publish>
 
    <ee:transform doc:name="Response">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "queued",
    messageId: attributes.messageId,
    destination: "orders-queue"
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
</flow>

Publicare cu ID de mesaj custom

<!-- Flow: Publicare cu deduplicare -->
<flow name="publish-idempotent-flow">
    <http:listener config-ref="HTTP_Listener" path="/transactions" method="POST"/>
 
    <!-- Genereaza un ID de mesaj determinist pentru deduplicare -->
    <ee:transform doc:name="Generate Message ID">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
import dw::Crypto
---
{
    data: payload,
    messageId: Crypto::hashWith(
        (payload.transactionId ++ payload.timestamp) as Binary,
        "SHA-256"
    )
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Publica cu ID de mesaj custom -->
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="transactions-fifo"
        messageId="#[payload.messageId]"
        doc:name="Publish Transaction">
        <anypoint-mq:body><![CDATA[#[payload.data]]]></anypoint-mq:body>
        <anypoint-mq:properties>
            <anypoint-mq:property key="transactionType" value="#[payload.data.type]"/>
        </anypoint-mq:properties>
    </anypoint-mq:publish>
 
</flow>

Publicare batch

<!-- Flow: Publicare batch de mesaje multiple -->
<flow name="batch-publish-flow">
    <http:listener config-ref="HTTP_Listener" path="/batch/orders" method="POST"/>
 
    <!-- Imparte array-ul in mesaje individuale -->
    <foreach collection="#[payload.orders]">
        <anypoint-mq:publish
            config-ref="Anypoint_MQ_Config"
            destination="orders-queue"
            doc:name="Publish Each Order">
            <anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
            <anypoint-mq:properties>
                <anypoint-mq:property key="batchId" value="#[vars.batchId]"/>
                <anypoint-mq:property key="itemIndex" value="#[vars.counter]"/>
            </anypoint-mq:properties>
        </anypoint-mq:publish>
    </foreach>
 
    <ee:transform doc:name="Batch Response">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "batch_queued",
    count: sizeOf(payload.orders),
    batchId: vars.batchId
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
</flow>

Consumul mesajelor

Flux subscriber (modelul Push)

<!-- Flow: Aboneaza-te la coada -->
<flow name="order-processor-flow">
    <!-- Anypoint MQ Subscriber (Push) -->
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="orders-queue"
        doc:name="Subscribe Orders"
        acknowledgementMode="MANUAL"
        acknowledgementTimeout="120000"
        pollingTime="1000"
        maxRedelivery="5"/>
 
    <logger level="INFO" message="Processing order: #[payload.orderId]"/>
 
    <!-- Proceseaza comanda -->
    <try doc:name="Process with Error Handling">
        <flow-ref name="process-order-subflow"/>
 
        <!-- Confirma la succes -->
        <anypoint-mq:ack
            config-ref="Anypoint_MQ_Config"
            doc:name="Acknowledge Message"/>
 
        <error-handler>
            <on-error-propagate type="ANY">
                <logger level="ERROR" message="Order processing failed: #[error.description]"/>
                <!-- Mesajul va fi relivrat sau trimis in DLQ -->
            </on-error-propagate>
        </error-handler>
    </try>
 
</flow>
 
<sub-flow name="process-order-subflow">
    <!-- Valideaza comanda -->
    <ee:transform doc:name="Validate">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload update {
    case .validated -> true
    case .processedAt -> now()
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Stocheaza in baza de date -->
    <db:insert config-ref="Database_Config" doc:name="Insert Order">
        <db:sql>
            INSERT INTO orders (order_id, customer_id, total, status, created_at)
            VALUES (:orderId, :customerId, :total, 'PROCESSED', :createdAt)
        </db:sql>
        <db:input-parameters><![CDATA[#[{
            orderId: payload.orderId,
            customerId: payload.customerId,
            total: payload.totalAmount,
            createdAt: now()
        }]]]></db:input-parameters>
    </db:insert>
</sub-flow>

Operatia consume (modelul Pull)

<!-- Flow: Consum la cerere -->
<flow name="consume-on-demand-flow">
    <http:listener config-ref="HTTP_Listener" path="/process/next" method="POST"/>
 
    <!-- Consuma un singur mesaj -->
    <anypoint-mq:consume
        config-ref="Anypoint_MQ_Config"
        destination="orders-queue"
        acknowledgementMode="MANUAL"
        acknowledgementTimeout="60000"
        pollingTime="10000"
        doc:name="Consume Next Message"/>
 
    <!-- Verifica daca mesajul a fost primit -->
    <choice doc:name="Message Received?">
        <when expression="#[payload != null]">
            <try doc:name="Process Message">
                <flow-ref name="process-order-subflow"/>
 
                <anypoint-mq:ack
                    config-ref="Anypoint_MQ_Config"
                    doc:name="Acknowledge"/>
 
                <ee:transform doc:name="Success Response">
                    <ee:message>
                        <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "processed",
    messageId: attributes.messageId
}]]></ee:set-payload>
                    </ee:message>
                </ee:transform>
 
                <error-handler>
                    <on-error-propagate type="ANY">
                        <anypoint-mq:nack
                            config-ref="Anypoint_MQ_Config"
                            doc:name="Negative Acknowledge"/>
                    </on-error-propagate>
                </error-handler>
            </try>
        </when>
        <otherwise>
            <ee:transform doc:name="No Messages">
                <ee:message>
                    <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "empty",
    message: "No messages available"
}]]></ee:set-payload>
                </ee:message>
            </ee:transform>
        </otherwise>
    </choice>
 
</flow>

Message Exchanges (Pub/Sub)

Configurarea Exchange-ului

<!-- Exchange Publisher -->
<flow name="notification-publisher-flow">
    <http:listener config-ref="HTTP_Listener" path="/events" method="POST"/>
 
    <!-- Publica in Exchange (difuzeaza catre toate cozile legate) -->
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="notifications-exchange"
        doc:name="Publish to Exchange">
        <anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
        <anypoint-mq:properties>
            <anypoint-mq:property key="eventType" value="#[payload.type]"/>
            <anypoint-mq:property key="source" value="#[payload.source]"/>
        </anypoint-mq:properties>
    </anypoint-mq:publish>
 
</flow>
 
<!-- Subscriber 1: Notificari Email -->
<flow name="email-notification-subscriber">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="email-notifications-queue"
        doc:name="Email Queue Subscriber"/>
 
    <flow-ref name="send-email-notification"/>
 
    <anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>
 
<!-- Subscriber 2: Notificari SMS -->
<flow name="sms-notification-subscriber">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="sms-notifications-queue"
        doc:name="SMS Queue Subscriber"/>
 
    <flow-ref name="send-sms-notification"/>
 
    <anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>
 
<!-- Subscriber 3: Notificari Push -->
<flow name="push-notification-subscriber">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="push-notifications-queue"
        doc:name="Push Queue Subscriber"/>
 
    <flow-ref name="send-push-notification"/>
 
    <anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>

Cozi FIFO

Configurarea si utilizarea cozilor FIFO

<!-- FIFO Queue Publisher cu Message Groups -->
<flow name="fifo-order-publisher">
    <http:listener config-ref="HTTP_Listener" path="/fifo/orders" method="POST"/>
 
    <!-- FIFO necesita un message group ID pentru ordonare -->
    <ee:transform doc:name="Prepare FIFO Message">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    order: payload,
    // Grupeaza dupa client pentru procesare ordonata per client
    messageGroupId: payload.customerId
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="orders-fifo"
        messageGroupId="#[payload.messageGroupId]"
        doc:name="Publish FIFO Order">
        <anypoint-mq:body><![CDATA[#[payload.order]]]></anypoint-mq:body>
    </anypoint-mq:publish>
 
</flow>
 
<!-- FIFO Queue Consumer -->
<flow name="fifo-order-consumer">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="orders-fifo"
        acknowledgementMode="MANUAL"
        acknowledgementTimeout="300000"
        doc:name="FIFO Subscriber"/>
 
    <logger level="INFO"
        message="Processing FIFO order: #[payload.orderId] from group: #[attributes.messageGroupId]"/>
 
    <try doc:name="FIFO Processing">
        <!-- Procesare FIFO - ordinea conteaza! -->
        <flow-ref name="process-fifo-order"/>
 
        <anypoint-mq:ack
            config-ref="Anypoint_MQ_Config"
            doc:name="Acknowledge FIFO"/>
 
        <error-handler>
            <on-error-propagate type="ANY">
                <logger level="ERROR"
                    message="FIFO processing failed - blocking group: #[attributes.messageGroupId]"/>
                <!-- In FIFO, esecul blocheaza grupul de mesaje -->
            </on-error-propagate>
        </error-handler>
    </try>
 
</flow>

Gestionarea Dead Letter Queue

Flux de procesare DLQ

<!-- Dead Letter Queue Processor -->
<flow name="dlq-processor-flow">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="orders-queue-dlq"
        acknowledgementMode="MANUAL"
        doc:name="DLQ Subscriber"/>
 
    <logger level="WARN"
        message="Processing DLQ message: #[attributes.messageId], Delivery count: #[attributes.deliveryCount]"/>
 
    <!-- Analizeaza motivul esecului -->
    <ee:transform doc:name="Analyze Failure">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    originalMessage: payload,
    messageId: attributes.messageId,
    deliveryCount: attributes.deliveryCount,
    firstDeliveryTime: attributes.timestamp,
    properties: attributes.properties,
    failureAnalysis: {
        possibleCauses: [
            "Validation error",
            "External service unavailable",
            "Data corruption",
            "Processing timeout"
        ]
    }
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Stocheaza in baza de date de erori pentru analiza -->
    <db:insert config-ref="Database_Config" doc:name="Log DLQ Message">
        <db:sql>
            INSERT INTO dlq_messages (
                message_id, queue_name, payload, delivery_count,
                first_delivery, logged_at, status
            ) VALUES (
                :messageId, 'orders-queue', :payload, :deliveryCount,
                :firstDelivery, CURRENT_TIMESTAMP, 'PENDING_REVIEW'
            )
        </db:sql>
        <db:input-parameters><![CDATA[#[{
            messageId: payload.messageId,
            payload: write(payload.originalMessage, "application/json"),
            deliveryCount: payload.deliveryCount,
            firstDelivery: payload.firstDeliveryTime
        }]]]></db:input-parameters>
    </db:insert>
 
    <!-- Trimite alerta -->
    <flow-ref name="send-dlq-alert"/>
 
    <!-- Confirma mesajul DLQ -->
    <anypoint-mq:ack
        config-ref="Anypoint_MQ_Config"
        doc:name="Acknowledge DLQ Message"/>
 
</flow>
 
<!-- Flux de reprocesare DLQ (trigger manual) -->
<flow name="dlq-reprocess-flow">
    <http:listener config-ref="HTTP_Listener" path="/dlq/reprocess/{messageId}" method="POST"/>
 
    <!-- Preia din baza de date de erori -->
    <db:select config-ref="Database_Config" doc:name="Get DLQ Message">
        <db:sql>
            SELECT * FROM dlq_messages WHERE message_id = :messageId AND status = 'PENDING_REVIEW'
        </db:sql>
        <db:input-parameters><![CDATA[#[{messageId: attributes.uriParams.messageId}]]]></db:input-parameters>
    </db:select>
 
    <choice doc:name="Message Found?">
        <when expression="#[sizeOf(payload) > 0]">
            <!-- Republica in coada originala -->
            <anypoint-mq:publish
                config-ref="Anypoint_MQ_Config"
                destination="orders-queue"
                doc:name="Republish Message">
                <anypoint-mq:body><![CDATA[#[read(payload[0].payload, "application/json")]]]></anypoint-mq:body>
                <anypoint-mq:properties>
                    <anypoint-mq:property key="reprocessed" value="true"/>
                    <anypoint-mq:property key="originalMessageId" value="#[payload[0].message_id]"/>
                </anypoint-mq:properties>
            </anypoint-mq:publish>
 
            <!-- Actualizeaza statusul in baza de date -->
            <db:update config-ref="Database_Config" doc:name="Update Status">
                <db:sql>UPDATE dlq_messages SET status = 'REPROCESSED' WHERE message_id = :messageId</db:sql>
                <db:input-parameters><![CDATA[#[{messageId: attributes.uriParams.messageId}]]]></db:input-parameters>
            </db:update>
        </when>
        <otherwise>
            <raise-error type="APP:NOT_FOUND" description="DLQ message not found"/>
        </otherwise>
    </choice>
 
</flow>

Pattern-uri avansate

Circuit Breaker cu coada

<!-- Pattern Circuit Breaker folosind Anypoint MQ -->
<flow name="circuit-breaker-queue-flow">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="external-api-requests"
        acknowledgementMode="MANUAL"
        doc:name="API Request Subscriber"/>
 
    <!-- Verifica starea circuit breaker-ului -->
    <os:retrieve
        key="circuit:external-api:state"
        objectStore="Circuit_Breaker_Store"
        doc:name="Get Circuit State">
        <os:default-value><![CDATA[#["CLOSED"]]]></os:default-value>
    </os:retrieve>
 
    <choice doc:name="Circuit State">
        <when expression="#[payload == 'OPEN']">
            <!-- Circuitul e deschis - respinge imediat -->
            <logger level="WARN" message="Circuit OPEN - delaying message"/>
 
            <!-- Confirmare negativa pentru retry mai tarziu -->
            <anypoint-mq:nack
                config-ref="Anypoint_MQ_Config"
                doc:name="NACK - Circuit Open"/>
        </when>
        <otherwise>
            <!-- Circuit inchis sau half-open - incearca cererea -->
            <try doc:name="Try External API">
                <http:request
                    config-ref="External_API_Config"
                    method="POST"
                    path="/api/process"
                    doc:name="Call External API"/>
 
                <!-- Succes - reseteaza contorul de esecuri -->
                <os:store
                    key="circuit:external-api:failures"
                    objectStore="Circuit_Breaker_Store"
                    doc:name="Reset Failures">
                    <os:value><![CDATA[#[0]]]></os:value>
                </os:store>
 
                <anypoint-mq:ack
                    config-ref="Anypoint_MQ_Config"
                    doc:name="Acknowledge Success"/>
 
                <error-handler>
                    <on-error-continue type="HTTP:CONNECTIVITY, HTTP:TIMEOUT">
                        <!-- Incrementeaza contorul de esecuri -->
                        <os:retrieve
                            key="circuit:external-api:failures"
                            objectStore="Circuit_Breaker_Store"
                            target="failureCount">
                            <os:default-value><![CDATA[#[0]]]></os:default-value>
                        </os:retrieve>
 
                        <os:store
                            key="circuit:external-api:failures"
                            objectStore="Circuit_Breaker_Store">
                            <os:value><![CDATA[#[vars.failureCount + 1]]]></os:value>
                        </os:store>
 
                        <!-- Deschide circuitul daca pragul e depasit -->
                        <choice>
                            <when expression="#[vars.failureCount >= 5]">
                                <os:store
                                    key="circuit:external-api:state"
                                    objectStore="Circuit_Breaker_Store">
                                    <os:value><![CDATA[#["OPEN"]]]></os:value>
                                </os:store>
                                <logger level="ERROR" message="Circuit OPENED after 5 failures"/>
                            </when>
                        </choice>
 
                        <!-- NACK pentru retry -->
                        <anypoint-mq:nack config-ref="Anypoint_MQ_Config"/>
                    </on-error-continue>
                </error-handler>
            </try>
        </otherwise>
    </choice>
 
</flow>
 
<!-- Scheduler de resetare Circuit Breaker -->
<flow name="circuit-breaker-reset-flow">
    <scheduler doc:name="Every 30 seconds">
        <scheduling-strategy>
            <fixed-frequency frequency="30" timeUnit="SECONDS"/>
        </scheduling-strategy>
    </scheduler>
 
    <os:retrieve
        key="circuit:external-api:state"
        objectStore="Circuit_Breaker_Store"
        target="currentState">
        <os:default-value><![CDATA[#["CLOSED"]]]></os:default-value>
    </os:retrieve>
 
    <choice>
        <when expression="#[vars.currentState == 'OPEN']">
            <!-- Trece in half-open pentru testare -->
            <os:store
                key="circuit:external-api:state"
                objectStore="Circuit_Breaker_Store">
                <os:value><![CDATA[#["HALF_OPEN"]]]></os:value>
            </os:store>
            <logger level="INFO" message="Circuit moved to HALF_OPEN"/>
        </when>
    </choice>
 
</flow>

Pattern-ul Request-Reply

<!-- Pattern Request-Reply -->
<flow name="request-reply-sender">
    <http:listener config-ref="HTTP_Listener" path="/sync/process" method="POST"/>
 
    <!-- Genereaza correlation ID -->
    <set-variable variableName="correlationId" value="#[uuid()]"/>
 
    <!-- Creeaza numele cozii temporare de raspuns -->
    <set-variable variableName="replyQueue" value="#['reply-' ++ vars.correlationId]"/>
 
    <!-- Publica cererea cu reply-to -->
    <anypoint-mq:publish
        config-ref="Anypoint_MQ_Config"
        destination="processing-requests"
        doc:name="Publish Request">
        <anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
        <anypoint-mq:properties>
            <anypoint-mq:property key="correlationId" value="#[vars.correlationId]"/>
            <anypoint-mq:property key="replyTo" value="#[vars.replyQueue]"/>
        </anypoint-mq:properties>
    </anypoint-mq:publish>
 
    <!-- Asteapta raspunsul (cu timeout) -->
    <anypoint-mq:consume
        config-ref="Anypoint_MQ_Config"
        destination="#[vars.replyQueue]"
        acknowledgementMode="IMMEDIATE"
        pollingTime="30000"
        doc:name="Wait for Reply"/>
 
    <choice doc:name="Reply Received?">
        <when expression="#[payload != null]">
            <logger level="INFO" message="Reply received for: #[vars.correlationId]"/>
        </when>
        <otherwise>
            <raise-error type="APP:TIMEOUT" description="Request timed out waiting for reply"/>
        </otherwise>
    </choice>
 
</flow>
 
<!-- Procesorul de cereri (Replier) -->
<flow name="request-reply-processor">
    <anypoint-mq:subscriber
        config-ref="Anypoint_MQ_Config"
        destination="processing-requests"
        acknowledgementMode="MANUAL"
        doc:name="Request Subscriber"/>
 
    <!-- Extrage coada reply-to -->
    <set-variable variableName="replyQueue" value="#[attributes.properties.'replyTo']"/>
    <set-variable variableName="correlationId" value="#[attributes.properties.'correlationId']"/>
 
    <try doc:name="Process and Reply">
        <!-- Proceseaza cererea -->
        <ee:transform doc:name="Process">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    result: "Processed successfully",
    originalData: payload,
    processedAt: now()
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
 
        <!-- Trimite raspunsul -->
        <anypoint-mq:publish
            config-ref="Anypoint_MQ_Config"
            destination="#[vars.replyQueue]"
            doc:name="Send Reply">
            <anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
            <anypoint-mq:properties>
                <anypoint-mq:property key="correlationId" value="#[vars.correlationId]"/>
            </anypoint-mq:properties>
        </anypoint-mq:publish>
 
        <anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
 
        <error-handler>
            <on-error-propagate type="ANY">
                <!-- Trimite raspuns de eroare -->
                <anypoint-mq:publish
                    config-ref="Anypoint_MQ_Config"
                    destination="#[vars.replyQueue]">
                    <anypoint-mq:body><![CDATA[#[{error: error.description}]]]></anypoint-mq:body>
                </anypoint-mq:publish>
            </on-error-propagate>
        </error-handler>
    </try>
 
</flow>

Monitorizare si operatiuni

API de statistici coada

<!-- Endpoint statistici coada -->
<flow name="queue-stats-flow">
    <http:listener config-ref="HTTP_Listener" path="/admin/queues/stats" method="GET"/>
 
    <ee:transform doc:name="Prepare Stats Request">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    queues: [
        "orders-queue",
        "notifications-queue",
        "orders-queue-dlq"
    ]
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>
 
    <!-- Obtine statistici pentru fiecare coada folosind Anypoint MQ Admin API -->
    <foreach collection="#[payload.queues]">
        <http:request
            method="GET"
            url="${anypoint.mq.admin.url}/queues/#[payload]"
            config-ref="Anypoint_MQ_Admin_Config"
            doc:name="Get Queue Stats">
            <http:headers>
                <http:header key="Authorization" value="#['Bearer ' ++ vars.accessToken]"/>
            </http:headers>
        </http:request>
 
        <ee:transform doc:name="Map Stats">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    queueName: payload.queueId,
    messagesInFlight: payload.messagesInFlight,
    messagesVisible: payload.messagesVisible,
    messagesSent: payload.messagesSent,
    messagesReceived: payload.messagesReceived,
    messagesAcked: payload.messagesAcked
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
    </foreach>
 
</flow>

Flux de Health Check

<!-- Anypoint MQ Health Check -->
<flow name="mq-health-check-flow">
    <http:listener config-ref="HTTP_Listener" path="/health/mq" method="GET"/>
 
    <try doc:name="MQ Health Check">
        <!-- Incearca sa publice si sa consume un mesaj de test -->
        <anypoint-mq:publish
            config-ref="Anypoint_MQ_Config"
            destination="health-check-queue"
            doc:name="Publish Test">
            <anypoint-mq:body><![CDATA[#[{test: true, timestamp: now()}]]]></anypoint-mq:body>
        </anypoint-mq:publish>
 
        <anypoint-mq:consume
            config-ref="Anypoint_MQ_Config"
            destination="health-check-queue"
            acknowledgementMode="IMMEDIATE"
            pollingTime="5000"
            doc:name="Consume Test"/>
 
        <ee:transform doc:name="Healthy Response">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "healthy",
    service: "Anypoint MQ",
    timestamp: now(),
    latencyMs: (now() - vars.startTime) as Number {unit: "milliseconds"}
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
 
        <error-handler>
            <on-error-continue type="ANY">
                <ee:transform doc:name="Unhealthy Response">
                    <ee:message>
                        <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    status: "unhealthy",
    service: "Anypoint MQ",
    error: error.description,
    timestamp: now()
}]]></ee:set-payload>
                        </ee:message>
                        <ee:set-attributes>
                            <ee:set-attribute attributeName="httpStatus">503</ee:set-attribute>
                        </ee:set-attributes>
                    </ee:transform>
            </on-error-continue>
        </error-handler>
    </try>
 
</flow>

Sumar bune practici

anypoint_mq_best_practices:
  design_coada:
    - "Foloseste nume de coada descriptive cu prefix de mediu"
    - "Configureaza TTL adecvat in functie de cerintele de business"
    - "Seteaza intotdeauna DLQ pentru cozile de productie"
    - "Foloseste cozi FIFO doar cand ordinea e critica (throughput mai mic)"
    - "Foloseste exchanges pentru scenarii de broadcast/multicast"
 
  gestionarea_mesajelor:
    - "Foloseste confirmare MANUALA pentru procesare fiabila"
    - "Seteaza timeout de confirmare adecvat (2x timpul de procesare)"
    - "Include correlation ID pentru trasabilitate"
    - "Adauga metadata ca proprietati de mesaj, nu in body"
    - "Implementeaza consumatori idempotenti pentru livrare at-least-once"
 
  gestionarea_erorilor:
    - "Seteaza maxDeliveries inainte de DLQ (tipic 3-5)"
    - "Monitorizarea DLQ regulat cu alerte"
    - "Construieste capacitate de reprocesare DLQ"
    - "Logheaza mesajele esuate cu context complet"
 
  performanta:
    - "Foloseste publicare batch pentru scenarii cu volum mare"
    - "Configureaza connection pooling adecvat"
    - "Monitorizarea adancimii cozii si a latentei de procesare"
    - "Scaleaza consumatorii in functie de backlog-ul cozii"
 
  securitate:
    - "Foloseste credentiale de client specifice mediului"
    - "Stocheaza credentialele in proprietati securizate"
    - "Foloseste VPC/retea privata pentru date sensibile"
    - "Cripteaza payload-urile mesajelor ce contin PII"

Concluzie

Anypoint MQ ofera messaging cloud fiabil pentru integrarile MuleSoft. Pattern-urile cheie includ pub/sub cu exchanges, FIFO pentru ordonare, DLQ pentru esecuri si request-reply pentru nevoi sincrone. Gestionarea corecta a confirmarilor si monitorizarea asigura fiabilitate in productie.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.