Anypoint MQ e serviciul de messaging cloud al MuleSoft pentru comunicare asincrona fiabila. Acest ghid acopera configurarea cozilor, pattern-uri de messaging si bune practici pentru productie.
Arhitectura Anypoint MQ
Prezentarea componentelor principale
┌─────────────────────────────────────────────────────────────────────┐
│ ANYPOINT MQ ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ PUBLISHERS ANYPOINT MQ │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Mule App │───publish───────►│ │ │
│ └─────────────┘ │ ┌─────────────────────┐ │ │
│ ┌─────────────┐ │ │ Standard Queue │ │ │
│ │ REST API │───publish───────►│ │ (at-least-once) │ │ │
│ └─────────────┘ │ └─────────────────────┘ │ │
│ ┌─────────────┐ │ │ │
│ │ External │───publish───────►│ ┌─────────────────────┐ │ │
│ │ System │ │ │ FIFO Queue │ │ │
│ └─────────────┘ │ │ (exactly-once) │ │ │
│ │ └─────────────────────┘ │ │
│ SUBSCRIBERS │ │ │
│ ┌─────────────┐ │ ┌─────────────────────┐ │ │
│ │ Mule App │◄──subscribe──────│ │ Message Exchange │ │ │
│ └─────────────┘ │ │ (pub/sub topics) │ │ │
│ ┌─────────────┐ │ └─────────────────────┘ │ │
│ │ Mule App │◄──subscribe──────│ │ │
│ └─────────────┘ │ ┌─────────────────────┐ │ │
│ │ │ Dead Letter Queue │ │ │
│ │ │ (failed messages) │ │ │
│ │ └─────────────────────┘ │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Configurarea cozilor
Setup coada standard
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:anypoint-mq="http://www.mulesoft.org/schema/mule/anypoint-mq"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/anypoint-mq
http://www.mulesoft.org/schema/mule/anypoint-mq/current/mule-anypoint-mq.xsd">
<!-- Anypoint MQ Configuration -->
<anypoint-mq:config name="Anypoint_MQ_Config" doc:name="Anypoint MQ Config">
<anypoint-mq:connection
url="${anypoint.mq.url}"
clientId="${anypoint.mq.clientId}"
clientSecret="${anypoint.mq.clientSecret}"/>
</anypoint-mq:config>
<!-- Alternative: Default Subscriber Configuration -->
<anypoint-mq:default-subscriber-config name="MQ_Subscriber_Config">
<anypoint-mq:connection
url="${anypoint.mq.url}"
clientId="${anypoint.mq.clientId}"
clientSecret="${anypoint.mq.clientSecret}"/>
</anypoint-mq:default-subscriber-config>
</mule>Configurarea proprietatilor cozii
# mule-artifact.json - Queue properties
anypoint:
mq:
# URL-uri specifice regiunii
url: "https://mq-us-east-1.anypoint.mulesoft.com/api/v1"
# url: "https://mq-eu-west-1.anypoint.mulesoft.com/api/v1"
# url: "https://mq-ap-southeast-1.anypoint.mulesoft.com/api/v1"
clientId: "${secure::anypoint.mq.clientId}"
clientSecret: "${secure::anypoint.mq.clientSecret}"
# Setari coada
queues:
orders:
name: "orders-queue"
defaultTtl: 604800000 # 7 zile in milisecunde
defaultLockTtl: 120000 # 2 minute lock
maxDeliveries: 5 # Inainte de DLQ
notifications:
name: "notifications-queue"
defaultTtl: 86400000 # 1 zi
defaultLockTtl: 60000 # 1 minut lock
maxDeliveries: 3
high-priority:
name: "high-priority-fifo"
fifo: true
defaultTtl: 86400000
defaultLockTtl: 300000 # 5 minute pentru FIFOPublicarea mesajelor
Publicare simpla de mesaje
<!-- Flow: Publica comanda in coada -->
<flow name="publish-order-flow">
<http:listener config-ref="HTTP_Listener" path="/orders" method="POST"/>
<!-- Transforma in formatul mesajului de coada -->
<ee:transform doc:name="Prepare Message">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
orderId: payload.id,
customerId: payload.customerId,
items: payload.items,
totalAmount: payload.totalAmount,
timestamp: now() as String {format: "yyyy-MM-dd'T'HH:mm:ss'Z'"}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Publica in coada standard -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
doc:name="Publish Order">
<anypoint-mq:properties>
<anypoint-mq:property key="orderType" value="#[payload.orderType]"/>
<anypoint-mq:property key="priority" value="#[payload.priority default 'normal']"/>
<anypoint-mq:property key="correlationId" value="#[correlationId]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
<ee:transform doc:name="Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "queued",
messageId: attributes.messageId,
destination: "orders-queue"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Publicare cu ID de mesaj custom
<!-- Flow: Publicare cu deduplicare -->
<flow name="publish-idempotent-flow">
<http:listener config-ref="HTTP_Listener" path="/transactions" method="POST"/>
<!-- Genereaza un ID de mesaj determinist pentru deduplicare -->
<ee:transform doc:name="Generate Message ID">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
import dw::Crypto
---
{
data: payload,
messageId: Crypto::hashWith(
(payload.transactionId ++ payload.timestamp) as Binary,
"SHA-256"
)
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Publica cu ID de mesaj custom -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="transactions-fifo"
messageId="#[payload.messageId]"
doc:name="Publish Transaction">
<anypoint-mq:body><![CDATA[#[payload.data]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="transactionType" value="#[payload.data.type]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
</flow>Publicare batch
<!-- Flow: Publicare batch de mesaje multiple -->
<flow name="batch-publish-flow">
<http:listener config-ref="HTTP_Listener" path="/batch/orders" method="POST"/>
<!-- Imparte array-ul in mesaje individuale -->
<foreach collection="#[payload.orders]">
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
doc:name="Publish Each Order">
<anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="batchId" value="#[vars.batchId]"/>
<anypoint-mq:property key="itemIndex" value="#[vars.counter]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
</foreach>
<ee:transform doc:name="Batch Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "batch_queued",
count: sizeOf(payload.orders),
batchId: vars.batchId
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>Consumul mesajelor
Flux subscriber (modelul Push)
<!-- Flow: Aboneaza-te la coada -->
<flow name="order-processor-flow">
<!-- Anypoint MQ Subscriber (Push) -->
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
doc:name="Subscribe Orders"
acknowledgementMode="MANUAL"
acknowledgementTimeout="120000"
pollingTime="1000"
maxRedelivery="5"/>
<logger level="INFO" message="Processing order: #[payload.orderId]"/>
<!-- Proceseaza comanda -->
<try doc:name="Process with Error Handling">
<flow-ref name="process-order-subflow"/>
<!-- Confirma la succes -->
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge Message"/>
<error-handler>
<on-error-propagate type="ANY">
<logger level="ERROR" message="Order processing failed: #[error.description]"/>
<!-- Mesajul va fi relivrat sau trimis in DLQ -->
</on-error-propagate>
</error-handler>
</try>
</flow>
<sub-flow name="process-order-subflow">
<!-- Valideaza comanda -->
<ee:transform doc:name="Validate">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload update {
case .validated -> true
case .processedAt -> now()
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Stocheaza in baza de date -->
<db:insert config-ref="Database_Config" doc:name="Insert Order">
<db:sql>
INSERT INTO orders (order_id, customer_id, total, status, created_at)
VALUES (:orderId, :customerId, :total, 'PROCESSED', :createdAt)
</db:sql>
<db:input-parameters><![CDATA[#[{
orderId: payload.orderId,
customerId: payload.customerId,
total: payload.totalAmount,
createdAt: now()
}]]]></db:input-parameters>
</db:insert>
</sub-flow>Operatia consume (modelul Pull)
<!-- Flow: Consum la cerere -->
<flow name="consume-on-demand-flow">
<http:listener config-ref="HTTP_Listener" path="/process/next" method="POST"/>
<!-- Consuma un singur mesaj -->
<anypoint-mq:consume
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
acknowledgementMode="MANUAL"
acknowledgementTimeout="60000"
pollingTime="10000"
doc:name="Consume Next Message"/>
<!-- Verifica daca mesajul a fost primit -->
<choice doc:name="Message Received?">
<when expression="#[payload != null]">
<try doc:name="Process Message">
<flow-ref name="process-order-subflow"/>
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge"/>
<ee:transform doc:name="Success Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "processed",
messageId: attributes.messageId
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<error-handler>
<on-error-propagate type="ANY">
<anypoint-mq:nack
config-ref="Anypoint_MQ_Config"
doc:name="Negative Acknowledge"/>
</on-error-propagate>
</error-handler>
</try>
</when>
<otherwise>
<ee:transform doc:name="No Messages">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "empty",
message: "No messages available"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</otherwise>
</choice>
</flow>Message Exchanges (Pub/Sub)
Configurarea Exchange-ului
<!-- Exchange Publisher -->
<flow name="notification-publisher-flow">
<http:listener config-ref="HTTP_Listener" path="/events" method="POST"/>
<!-- Publica in Exchange (difuzeaza catre toate cozile legate) -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="notifications-exchange"
doc:name="Publish to Exchange">
<anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="eventType" value="#[payload.type]"/>
<anypoint-mq:property key="source" value="#[payload.source]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
</flow>
<!-- Subscriber 1: Notificari Email -->
<flow name="email-notification-subscriber">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="email-notifications-queue"
doc:name="Email Queue Subscriber"/>
<flow-ref name="send-email-notification"/>
<anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>
<!-- Subscriber 2: Notificari SMS -->
<flow name="sms-notification-subscriber">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="sms-notifications-queue"
doc:name="SMS Queue Subscriber"/>
<flow-ref name="send-sms-notification"/>
<anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>
<!-- Subscriber 3: Notificari Push -->
<flow name="push-notification-subscriber">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="push-notifications-queue"
doc:name="Push Queue Subscriber"/>
<flow-ref name="send-push-notification"/>
<anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
</flow>Cozi FIFO
Configurarea si utilizarea cozilor FIFO
<!-- FIFO Queue Publisher cu Message Groups -->
<flow name="fifo-order-publisher">
<http:listener config-ref="HTTP_Listener" path="/fifo/orders" method="POST"/>
<!-- FIFO necesita un message group ID pentru ordonare -->
<ee:transform doc:name="Prepare FIFO Message">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
order: payload,
// Grupeaza dupa client pentru procesare ordonata per client
messageGroupId: payload.customerId
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="orders-fifo"
messageGroupId="#[payload.messageGroupId]"
doc:name="Publish FIFO Order">
<anypoint-mq:body><![CDATA[#[payload.order]]]></anypoint-mq:body>
</anypoint-mq:publish>
</flow>
<!-- FIFO Queue Consumer -->
<flow name="fifo-order-consumer">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="orders-fifo"
acknowledgementMode="MANUAL"
acknowledgementTimeout="300000"
doc:name="FIFO Subscriber"/>
<logger level="INFO"
message="Processing FIFO order: #[payload.orderId] from group: #[attributes.messageGroupId]"/>
<try doc:name="FIFO Processing">
<!-- Procesare FIFO - ordinea conteaza! -->
<flow-ref name="process-fifo-order"/>
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge FIFO"/>
<error-handler>
<on-error-propagate type="ANY">
<logger level="ERROR"
message="FIFO processing failed - blocking group: #[attributes.messageGroupId]"/>
<!-- In FIFO, esecul blocheaza grupul de mesaje -->
</on-error-propagate>
</error-handler>
</try>
</flow>Gestionarea Dead Letter Queue
Flux de procesare DLQ
<!-- Dead Letter Queue Processor -->
<flow name="dlq-processor-flow">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="orders-queue-dlq"
acknowledgementMode="MANUAL"
doc:name="DLQ Subscriber"/>
<logger level="WARN"
message="Processing DLQ message: #[attributes.messageId], Delivery count: #[attributes.deliveryCount]"/>
<!-- Analizeaza motivul esecului -->
<ee:transform doc:name="Analyze Failure">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
originalMessage: payload,
messageId: attributes.messageId,
deliveryCount: attributes.deliveryCount,
firstDeliveryTime: attributes.timestamp,
properties: attributes.properties,
failureAnalysis: {
possibleCauses: [
"Validation error",
"External service unavailable",
"Data corruption",
"Processing timeout"
]
}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Stocheaza in baza de date de erori pentru analiza -->
<db:insert config-ref="Database_Config" doc:name="Log DLQ Message">
<db:sql>
INSERT INTO dlq_messages (
message_id, queue_name, payload, delivery_count,
first_delivery, logged_at, status
) VALUES (
:messageId, 'orders-queue', :payload, :deliveryCount,
:firstDelivery, CURRENT_TIMESTAMP, 'PENDING_REVIEW'
)
</db:sql>
<db:input-parameters><![CDATA[#[{
messageId: payload.messageId,
payload: write(payload.originalMessage, "application/json"),
deliveryCount: payload.deliveryCount,
firstDelivery: payload.firstDeliveryTime
}]]]></db:input-parameters>
</db:insert>
<!-- Trimite alerta -->
<flow-ref name="send-dlq-alert"/>
<!-- Confirma mesajul DLQ -->
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge DLQ Message"/>
</flow>
<!-- Flux de reprocesare DLQ (trigger manual) -->
<flow name="dlq-reprocess-flow">
<http:listener config-ref="HTTP_Listener" path="/dlq/reprocess/{messageId}" method="POST"/>
<!-- Preia din baza de date de erori -->
<db:select config-ref="Database_Config" doc:name="Get DLQ Message">
<db:sql>
SELECT * FROM dlq_messages WHERE message_id = :messageId AND status = 'PENDING_REVIEW'
</db:sql>
<db:input-parameters><![CDATA[#[{messageId: attributes.uriParams.messageId}]]]></db:input-parameters>
</db:select>
<choice doc:name="Message Found?">
<when expression="#[sizeOf(payload) > 0]">
<!-- Republica in coada originala -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="orders-queue"
doc:name="Republish Message">
<anypoint-mq:body><![CDATA[#[read(payload[0].payload, "application/json")]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="reprocessed" value="true"/>
<anypoint-mq:property key="originalMessageId" value="#[payload[0].message_id]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
<!-- Actualizeaza statusul in baza de date -->
<db:update config-ref="Database_Config" doc:name="Update Status">
<db:sql>UPDATE dlq_messages SET status = 'REPROCESSED' WHERE message_id = :messageId</db:sql>
<db:input-parameters><![CDATA[#[{messageId: attributes.uriParams.messageId}]]]></db:input-parameters>
</db:update>
</when>
<otherwise>
<raise-error type="APP:NOT_FOUND" description="DLQ message not found"/>
</otherwise>
</choice>
</flow>Pattern-uri avansate
Circuit Breaker cu coada
<!-- Pattern Circuit Breaker folosind Anypoint MQ -->
<flow name="circuit-breaker-queue-flow">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="external-api-requests"
acknowledgementMode="MANUAL"
doc:name="API Request Subscriber"/>
<!-- Verifica starea circuit breaker-ului -->
<os:retrieve
key="circuit:external-api:state"
objectStore="Circuit_Breaker_Store"
doc:name="Get Circuit State">
<os:default-value><![CDATA[#["CLOSED"]]]></os:default-value>
</os:retrieve>
<choice doc:name="Circuit State">
<when expression="#[payload == 'OPEN']">
<!-- Circuitul e deschis - respinge imediat -->
<logger level="WARN" message="Circuit OPEN - delaying message"/>
<!-- Confirmare negativa pentru retry mai tarziu -->
<anypoint-mq:nack
config-ref="Anypoint_MQ_Config"
doc:name="NACK - Circuit Open"/>
</when>
<otherwise>
<!-- Circuit inchis sau half-open - incearca cererea -->
<try doc:name="Try External API">
<http:request
config-ref="External_API_Config"
method="POST"
path="/api/process"
doc:name="Call External API"/>
<!-- Succes - reseteaza contorul de esecuri -->
<os:store
key="circuit:external-api:failures"
objectStore="Circuit_Breaker_Store"
doc:name="Reset Failures">
<os:value><![CDATA[#[0]]]></os:value>
</os:store>
<anypoint-mq:ack
config-ref="Anypoint_MQ_Config"
doc:name="Acknowledge Success"/>
<error-handler>
<on-error-continue type="HTTP:CONNECTIVITY, HTTP:TIMEOUT">
<!-- Incrementeaza contorul de esecuri -->
<os:retrieve
key="circuit:external-api:failures"
objectStore="Circuit_Breaker_Store"
target="failureCount">
<os:default-value><![CDATA[#[0]]]></os:default-value>
</os:retrieve>
<os:store
key="circuit:external-api:failures"
objectStore="Circuit_Breaker_Store">
<os:value><![CDATA[#[vars.failureCount + 1]]]></os:value>
</os:store>
<!-- Deschide circuitul daca pragul e depasit -->
<choice>
<when expression="#[vars.failureCount >= 5]">
<os:store
key="circuit:external-api:state"
objectStore="Circuit_Breaker_Store">
<os:value><![CDATA[#["OPEN"]]]></os:value>
</os:store>
<logger level="ERROR" message="Circuit OPENED after 5 failures"/>
</when>
</choice>
<!-- NACK pentru retry -->
<anypoint-mq:nack config-ref="Anypoint_MQ_Config"/>
</on-error-continue>
</error-handler>
</try>
</otherwise>
</choice>
</flow>
<!-- Scheduler de resetare Circuit Breaker -->
<flow name="circuit-breaker-reset-flow">
<scheduler doc:name="Every 30 seconds">
<scheduling-strategy>
<fixed-frequency frequency="30" timeUnit="SECONDS"/>
</scheduling-strategy>
</scheduler>
<os:retrieve
key="circuit:external-api:state"
objectStore="Circuit_Breaker_Store"
target="currentState">
<os:default-value><![CDATA[#["CLOSED"]]]></os:default-value>
</os:retrieve>
<choice>
<when expression="#[vars.currentState == 'OPEN']">
<!-- Trece in half-open pentru testare -->
<os:store
key="circuit:external-api:state"
objectStore="Circuit_Breaker_Store">
<os:value><![CDATA[#["HALF_OPEN"]]]></os:value>
</os:store>
<logger level="INFO" message="Circuit moved to HALF_OPEN"/>
</when>
</choice>
</flow>Pattern-ul Request-Reply
<!-- Pattern Request-Reply -->
<flow name="request-reply-sender">
<http:listener config-ref="HTTP_Listener" path="/sync/process" method="POST"/>
<!-- Genereaza correlation ID -->
<set-variable variableName="correlationId" value="#[uuid()]"/>
<!-- Creeaza numele cozii temporare de raspuns -->
<set-variable variableName="replyQueue" value="#['reply-' ++ vars.correlationId]"/>
<!-- Publica cererea cu reply-to -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="processing-requests"
doc:name="Publish Request">
<anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="correlationId" value="#[vars.correlationId]"/>
<anypoint-mq:property key="replyTo" value="#[vars.replyQueue]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
<!-- Asteapta raspunsul (cu timeout) -->
<anypoint-mq:consume
config-ref="Anypoint_MQ_Config"
destination="#[vars.replyQueue]"
acknowledgementMode="IMMEDIATE"
pollingTime="30000"
doc:name="Wait for Reply"/>
<choice doc:name="Reply Received?">
<when expression="#[payload != null]">
<logger level="INFO" message="Reply received for: #[vars.correlationId]"/>
</when>
<otherwise>
<raise-error type="APP:TIMEOUT" description="Request timed out waiting for reply"/>
</otherwise>
</choice>
</flow>
<!-- Procesorul de cereri (Replier) -->
<flow name="request-reply-processor">
<anypoint-mq:subscriber
config-ref="Anypoint_MQ_Config"
destination="processing-requests"
acknowledgementMode="MANUAL"
doc:name="Request Subscriber"/>
<!-- Extrage coada reply-to -->
<set-variable variableName="replyQueue" value="#[attributes.properties.'replyTo']"/>
<set-variable variableName="correlationId" value="#[attributes.properties.'correlationId']"/>
<try doc:name="Process and Reply">
<!-- Proceseaza cererea -->
<ee:transform doc:name="Process">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
result: "Processed successfully",
originalData: payload,
processedAt: now()
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Trimite raspunsul -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="#[vars.replyQueue]"
doc:name="Send Reply">
<anypoint-mq:body><![CDATA[#[payload]]]></anypoint-mq:body>
<anypoint-mq:properties>
<anypoint-mq:property key="correlationId" value="#[vars.correlationId]"/>
</anypoint-mq:properties>
</anypoint-mq:publish>
<anypoint-mq:ack config-ref="Anypoint_MQ_Config"/>
<error-handler>
<on-error-propagate type="ANY">
<!-- Trimite raspuns de eroare -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="#[vars.replyQueue]">
<anypoint-mq:body><![CDATA[#[{error: error.description}]]]></anypoint-mq:body>
</anypoint-mq:publish>
</on-error-propagate>
</error-handler>
</try>
</flow>Monitorizare si operatiuni
API de statistici coada
<!-- Endpoint statistici coada -->
<flow name="queue-stats-flow">
<http:listener config-ref="HTTP_Listener" path="/admin/queues/stats" method="GET"/>
<ee:transform doc:name="Prepare Stats Request">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
queues: [
"orders-queue",
"notifications-queue",
"orders-queue-dlq"
]
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<!-- Obtine statistici pentru fiecare coada folosind Anypoint MQ Admin API -->
<foreach collection="#[payload.queues]">
<http:request
method="GET"
url="${anypoint.mq.admin.url}/queues/#[payload]"
config-ref="Anypoint_MQ_Admin_Config"
doc:name="Get Queue Stats">
<http:headers>
<http:header key="Authorization" value="#['Bearer ' ++ vars.accessToken]"/>
</http:headers>
</http:request>
<ee:transform doc:name="Map Stats">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
queueName: payload.queueId,
messagesInFlight: payload.messagesInFlight,
messagesVisible: payload.messagesVisible,
messagesSent: payload.messagesSent,
messagesReceived: payload.messagesReceived,
messagesAcked: payload.messagesAcked
}]]></ee:set-payload>
</ee:message>
</ee:transform>
</foreach>
</flow>Flux de Health Check
<!-- Anypoint MQ Health Check -->
<flow name="mq-health-check-flow">
<http:listener config-ref="HTTP_Listener" path="/health/mq" method="GET"/>
<try doc:name="MQ Health Check">
<!-- Incearca sa publice si sa consume un mesaj de test -->
<anypoint-mq:publish
config-ref="Anypoint_MQ_Config"
destination="health-check-queue"
doc:name="Publish Test">
<anypoint-mq:body><![CDATA[#[{test: true, timestamp: now()}]]]></anypoint-mq:body>
</anypoint-mq:publish>
<anypoint-mq:consume
config-ref="Anypoint_MQ_Config"
destination="health-check-queue"
acknowledgementMode="IMMEDIATE"
pollingTime="5000"
doc:name="Consume Test"/>
<ee:transform doc:name="Healthy Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "healthy",
service: "Anypoint MQ",
timestamp: now(),
latencyMs: (now() - vars.startTime) as Number {unit: "milliseconds"}
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<error-handler>
<on-error-continue type="ANY">
<ee:transform doc:name="Unhealthy Response">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
status: "unhealthy",
service: "Anypoint MQ",
error: error.description,
timestamp: now()
}]]></ee:set-payload>
</ee:message>
<ee:set-attributes>
<ee:set-attribute attributeName="httpStatus">503</ee:set-attribute>
</ee:set-attributes>
</ee:transform>
</on-error-continue>
</error-handler>
</try>
</flow>Sumar bune practici
anypoint_mq_best_practices:
design_coada:
- "Foloseste nume de coada descriptive cu prefix de mediu"
- "Configureaza TTL adecvat in functie de cerintele de business"
- "Seteaza intotdeauna DLQ pentru cozile de productie"
- "Foloseste cozi FIFO doar cand ordinea e critica (throughput mai mic)"
- "Foloseste exchanges pentru scenarii de broadcast/multicast"
gestionarea_mesajelor:
- "Foloseste confirmare MANUALA pentru procesare fiabila"
- "Seteaza timeout de confirmare adecvat (2x timpul de procesare)"
- "Include correlation ID pentru trasabilitate"
- "Adauga metadata ca proprietati de mesaj, nu in body"
- "Implementeaza consumatori idempotenti pentru livrare at-least-once"
gestionarea_erorilor:
- "Seteaza maxDeliveries inainte de DLQ (tipic 3-5)"
- "Monitorizarea DLQ regulat cu alerte"
- "Construieste capacitate de reprocesare DLQ"
- "Logheaza mesajele esuate cu context complet"
performanta:
- "Foloseste publicare batch pentru scenarii cu volum mare"
- "Configureaza connection pooling adecvat"
- "Monitorizarea adancimii cozii si a latentei de procesare"
- "Scaleaza consumatorii in functie de backlog-ul cozii"
securitate:
- "Foloseste credentiale de client specifice mediului"
- "Stocheaza credentialele in proprietati securizate"
- "Foloseste VPC/retea privata pentru date sensibile"
- "Cripteaza payload-urile mesajelor ce contin PII"Concluzie
Anypoint MQ ofera messaging cloud fiabil pentru integrarile MuleSoft. Pattern-urile cheie includ pub/sub cu exchanges, FIFO pentru ordonare, DLQ pentru esecuri si request-reply pentru nevoi sincrone. Gestionarea corecta a confirmarilor si monitorizarea asigura fiabilitate in productie.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →