Anypoint MQ Connector

Apisero
7 min readAug 6, 2021

--

Author: Kishori Patil

Content:

  • The Anypoint MQ provides publish-subscribe messaging to Mule apps using the Anypoint MQ cloud service.
  • This blog includes a demo for 4 operations: Publish, Subscriber, Ack, and Nack, provided by Anypoint MQ Connector.

Prerequisites:

We will be using the same MQs we created in Anypoint MQ- on the Anypoint Platform blog, which gives a Basic idea about MQ and how to create MQs on the Platform.

Setup:

  1. First, we will download the Anypoint MQ connector for that click on search in exchange. Login to your Anypoint account and select Anypoint MQ connector.

2. Create Anypoint MQ Config in global. For that, you need client app id, client secret, and URL. URL will be taken from the platform available at the following location.

3. Client app id, client secret taken from clients App on the platform. Click on the + symbol, which will open the window, as shown in the below screenshot. Create a new app as follows.

4. Use these in the MQ configuration.

Operations in Anypoint MQ:

There are 5 operations for MQ, which are Publish, Subscriber, Ack, Nack, Consume. Out of these 5, we will see Publish, Subscriber, Ack, and Nack.

1. Publish:

  • It is used to send a message to MQ.
  • This will publish the message given in the body into MQ Mentioned in Destination of config.
  • Create a simple flow as shown in the figure with publish config:
  • With postman send the request as follows:
  • If you go to Anypoint platform>MQ> mq-demo in the message browser, you can see published a new message.

2. Subscriber:

  • It is used to read/retrieve messages from the MQ.
  • Subscriber will subscribe from the MQ mentioned in the Queue option of configuration.

Create a flow with subscribers as follows. Write the name of the queue from which you want to read the message in Queue.

Subscriber type:

You have 2 types for subscribers:
1. Prefetch:

  • It will prefetch 30 messages as per the above config in the local buffer, and the messages appear in in-flight. You can change that to any number as per your requirement.
  • Even If multiple messages are fetched at a time, it will still process one message at a time.

2. Polling:
You can fetch messages from MQ at the scheduled time. The Subscriber checks for new messages in the queue at a fixed time, and whatever fetch Size you have mentioned in the config, it will fetch only that many messages from Mq for that run. The remaining messages will be taken in the next run.

Acknowledgement Mode:

There are three Acknowledgement Modes available.
1. Auto

  • Default acknowledgment mode.
  • If the flow execution finishes successfully without any exception, it automatically sends the acknowledgment back, and the message is deleted from MQ.
  • If the flow execution finishes with a propagated exception, the message is automatically not acknowledged and returned to the redelivery queue.

2. Immediate:

  • The message is acknowledged as soon as it is fetched from the MQ local. It will not wait till the flow processes it.

3. Manual:

  • In this mode, ack or Nack is sent to Subscriber manually using application logic.
  • Ack: It informs the Anypoint MQ that the message has been processed successfully and removes that message.
  • Nack: It informs the Anypoint MQ that the message is not processed successfully, so return the message to the queue for redelivery.
  • You will need the value of ackToken, which is provided as part of the message attributes.

Subscriber with flow executes successfully with a default config(AUTO ack mode):

  • Add a logger to print messages subscribed from MQ, and here the subscriber config is the default, as shown in the above screenshot.
  • As the subscriber type is prefetch as soon as the app is deployed it will fetch all messages from mq and try to process them one by one. As the acknowledgement mode is Auto it will wait for Automatically send acknowledgement as the flow processed successfully. As flow executed successfully it will delete the message from queue. If flow throws any exception, it will keep that message in queue as it will not receive Nack and if we have added any DLQ for this MQ, it will be sent to that DLQ. In our case it will be deleted from the queue as processed successfully without any exception.
    After deploying you can see that message we stored in previous published operation is subscribed from mq-demo as follow: Subscriber will be dropped in the source section.

Subscriber with flow executes successfully with IMMEDIATE Acknowledgement mode:

  • Add a logger to print messages subscribed from MQ. Added a request component that will throw an error, and here the subscriber config is with Immediate Acknowledgement mode.
  • As the subscriber type is prefetch as soon as the app is deployed it will fetch all messages from mq and try to process them. As the acknowledgement mode is Immediate, as soon as it receives a message it will immediately send acknowledgement back and the message will be deleted from MQ whether it will process successfully or not.
    After deploying you can see that message will deleted from MQ and will not be sent to mq-demo-DLQ even if its not processed successfully as follow:

4. Ack: (subscribe with MANUAL acknowledgment mode)

  • Using this operation, we can send an acknowledgment back that message is processed successfully when acknowledgment mode is manual.
  • Create a flow with the following 3 components with acknowledgment mode in subscriber config is manual and type is prefetch.
  • In ack, use ack token, which is present in input to Ack operation in attributes, and you can access that with attributes.ackToken.
  • As the subscriber type is prefetch, as soon as the app is deployed, it will fetch all messages from MQ and process them one by one. As the acknowledgment mode is manual, it will wait for manual acknowledgment from the flow. As soon as the Ack operation is executed successfully, it will delete the message from the queue. In the case before Ack operation flow throws an exception, it will keep that message in queue if no DLQ is added for this MQ or sent to DLQ as it will not receive any acknowledgment. In our case, it will be deleted from the queue as processed successfully.

5. Nack : (subscribe with MANUAL acknowledgment mode)

  • Using this operation, we can send an acknowledgment back that message is not processed successfully when acknowledgment mode is manual.
  • Create a flow with the following 4 components with acknowledgment mode in subscriber config is manual and type is prefetch.
  • In Nack, use ack token, which is present in input to Ack operation in attributes, and you can access that with “attributes.ackToken”.
  • As the subscriber type is prefetch as soon as the app is deployed it will fetch all messages from mq and try to process it one by one. As the acknowledgment mode is manual it will wait for manual acknowledgment from the flow. As soon as Nack operation is executed successfully, it will keep the message in queue if no DLQ is added for this MQ or sent to DLQ as it will not receive a negative acknowledgment. As it’s a negative acknowledgment. In our case, it will be added to DLQ.
  • As in our case, mq-demo has DLQ, you can check that message in mq-demo-DLQ.

--

--