Implement a persistent message redistribution system with Apache Camel and Apache ActiveMQ.
When you use routes Apache Camel™ dans votre intégration et qu’un problème apparaît, la démarche simple est de réessayer l’envoi du message notamment pour les erreurs dites « temporaires » (exemple : en cas de panne de réseau, un renvoi de messages dans l’attente d’un retour de réseau).
In Apache Camel™, this redistribution rule is configured in the error handler. The latter can handle such errors. By default, the message to be redistributed is stored in memory. As a result, new trade is not transmitted, as the first trade is not reported as "treaty"!
This approach is problematic because if you restart the container hosting the Camel route (such as Apache Karaf™), the exchange is lost. In addition, in terms of performance, we would like to ensure continuity of exchanges.
In this article, we will illustrate several solutions to achieve a redistribution policy.
Apache routes Camel™
We will explore the roads to understand our case. In this example, we use DSL Camel Blueprint because the preferred container is Apache Karaf™.
- The first route is quite simple: it listens to incoming HTTP requests (using Camel Jetty), converts them into a chain and then sends them to a JMS queue.
- The second route reads the messages from the JMS queue, prepares a JSON using a processor (registered as a service in Karaf) and calls a REST service.
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:cxf="http://camel.apache.org/schema/cxf"> <reference id="connectionFactory" interface="javax.jms.ConnectionFactory"/> <reference id="jsonProcessor" interface="org.apache.camel.Processor" filter="(name=json)"/> <cxf:rsClient id="rsClient" address="http://localhost:8181/cxf/test/"/> <camelContext xmlns="http://camel.apache.org/schema/blueprint"> <route id="first"> <from uri="jetty:http://0.0.0.0:9090/first"/> <convertBodyTo type="java.lang.String"/> <wireTap uri=jms:queue:second?connectionFactory=connectionFactory"/> <setBody><constant>OK</constant></setBody> </route> <route id="second"> <from uri="jms:queue:second?connectionFactory=connectionFactory"/> <process ref="jsonProcessor"/> <setHeader headerName="operationName"><constant>updateResource</constant></setHeader> <setHeader headerName="CamelCxfRsUsingHttpAPI"><constant>false</constant></setHeader> <setHeader herderName="CamelAcceptContentType"><constant>application/json</constant></setHeader> <to uri="cxfrs://bean://rsClient?synchronous=true"/> </route> </camelContext> </blueprint>
We would like to keep the message so that it can be redistributedeven in the event of a restart or platform failure
By default, it is already covered in Camel since it defines JMS messages as persistent.
We would like to delete the message in the JMS queue only when it has actually been successfully processed.
In this case, we change the JMS acknowledgement mode. By default, Camel uses auto-ack, i.e. the ack is sent to the broker as soon as the message is read (i.e. just after theendpoint JMS). If an error/exception occurs on the Camel route, the message is deleted from the queue.
By switching the mode ack in mode ack clientwe can send the ack only at the time of the exchange. Thus, the message will be deleted from the queue when it has been processed successfully in its entirety.
To change the ack mode on the URI of the JMS endpoint:
We want to relaunch the message several times, or to infinity.
For this 3rd example, we change the supplier URL in the ActiveMQ factory connection. By default, the message is redistributed a maximum of 7 times. It can be modified on the URL used by my factory connection.
In the context where you define a maximum number of 10 times, you will have to do:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=10" /> </bean>
You can redistribute it indefinitely (until the message expires) by using -1 for maximumRedeliveries
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=-1" /> </bean>
We have just learned about the redistribution mechanism of ActiveMQ which is quite practical. In order to optimize the number of attempts, we can introduce a certain delay between each one. The more the number increases, the more we can increase the delay: this is called a backoff delivery.
To be configured on the URL of the connection factory, you must use:
- useExponentialBackOff allows for a backoff exponential. It is disabled by default;
- backOffMultiplier Allows you to choose how to increase the delay. By default, the "new" delay is 5 times higher than the "previous" one;
- initialRedeliveryDelay is the time limit of delivery initial. This is the start time of the backoff. By default, it is 1000Lor 1 second;
- maximumRedeliveryDelay is the maximum time we can have. By default, it is –1which means there is no maximum
So, we will update the URL of the connection factory to allow for a backoff delivery :
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=-1&jms.redeliveryPolicy.useExponentialBackOff=true&jms.redeliveryPolicy.initialRedeliveryDelay=2000L&jms.redeliveryPolicy.backOffMultiplier=2" /> </bean>
We have a policy here of backoff delivery which will double the delay for each attempt, starting at 2 seconds. Thanks to this, we keep a fairly simple Camel route and benefit from a correct and exponential redistribution, with backoff management.