Implémenter un système persistant de redistribution des messages avec Apache Camel et Apache ActiveMQ.

Implémenter un système persistant de redistribution des messages avec Apache Camel et Apache ActiveMQ.

Lorsque vous utilisez des routes Apache Camel™ dans votre intégration et qu’un problème apparaît, la démarche simple est de réessayer l’envoi du message notamment pour les erreurs dites « temporaires » (exemple : en cas de panne de réseau, un renvoi de messages dans l’attente d’un retour de réseau).

Dans Apache Camel™, cette règle de redistribution est configurée dans le gestionnaire d’erreurs. Ce dernier peut prendre en charge de telles erreurs. Par défaut, le message à redistribuer est stocké en mémoire. De ce fait, les nouveaux échanges ne sont pas transmis, car le premier n’est pas signalé comme «traité» !

Cette approche pose problème car si vous redémarrez le conteneur hébergeant la route Camel (comme Apache Karaf™), l’échange est perdu. De plus, en termes de performances, nous voudrions assurer une continuité des échanges.

Dans cet article, nous allons illustrer plusieurs solutions pour parvenir à une politique de redistribution.

Adaptation de l’article :  « Implement simple persistent redelivery with backoff mixing Apache Camel & ActiveMQ » du blog Nanthrax  écrit par Jean-baptiste Onofré (Technical Advisor)

yupiik-camel-activemq-expertise-apache-redelivery

Les routes Apache Camel™

Nous allons explorer les routes pour comprendre notre cas. Dans cet exemple, nous utilisons DSL Camel Blueprint car le conteneur de prédilection est Apache Karaf™. 

  • La première route est assez simple : elle écoute les requêtes entrantes HTTP (en utilisant Camel Jetty), les convertit en chaîne pour ensuite les envoyer dans une file d’attente JMS.
  • La seconde route lit les messages de la file d’attente JMS, prépare un JSON en utilisant un processeur (enregistré comme service dans Karaf) et appelle un service REST.
 <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"            xmlns:cxf="http://camel.apache.org/schema/cxf">   <reference id="connectionFactory" interface="javax.jms.ConnectionFactory"/>   <reference id="jsonProcessor" interface="org.apache.camel.Processor" filter="(name=json)"/>   <cxf:rsClient id="rsClient" address="http://localhost:8181/cxf/test/"/>   <camelContext xmlns="http://camel.apache.org/schema/blueprint">      <route id="first">         <from uri="jetty:http://0.0.0.0:9090/first"/>         <convertBodyTo type="java.lang.String"/>         <wireTap uri=jms:queue:second?connectionFactory=connectionFactory"/>         <setBody><constant>OK</constant></setBody>      </route>      <route id="second">        <from uri="jms:queue:second?connectionFactory=connectionFactory"/>        <process ref="jsonProcessor"/>        <setHeader headerName="operationName"><constant>updateResource</constant></setHeader>        <setHeader headerName="CamelCxfRsUsingHttpAPI"><constant>false</constant></setHeader>        <setHeader herderName="CamelAcceptContentType"><constant>application/json</constant></setHeader>        <to uri="cxfrs://bean://rsClient?synchronous=true"/>      </route>   </camelContext> </blueprint>

Cas n°1

Nous souhaitons conserver le message pour pouvoir le redistribuer, même en cas de redémarrage ou de panne de la plateforme

Solution

Par défaut, il est déjà couvert dans Camel puisqu’il définit les messages JMS comme persistants.

Cas n°2

Nous souhaitons supprimer le message de la file d’attente JMS uniquement lorsqu’il a réellement été traité avec succès.

Solution

Dans ce cas, nous changeons le mode d’acquittement JMS. Par défaut, Camel utilise auto-ack, c’est à dire que l’ack est envoyé au broker dès que le message est lu (soit juste après l’endpoint JMS). Si une erreur/exception se produit de la route Camel, le message est supprimé de la file d’attente.

En basculant le mode ack en mode ack client, nous pouvons envoyer l’ack uniquement au moment de l’échange. Ainsi, le message sera supprimé de la file d’attente lorsque celui-ci a été traité avec succès dans son intégralité.

Pour modifier le mode ack sur l’URI de l’endpoint JMS :

<from uri="jms:queue:second?connectionFactory=connectionFactory&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE"/>

Cas n°3

Nous souhaitons relancer le message plusieurs fois, ou à l’infini.

Solution

Pour cette 3ème exemple, nous changeons l’URL fournisseur dans la connection factory d’ActiveMQ. Par défaut, le message est redistribué 7 fois au maximum. Il peut être modifié sur l’URL utilisée par ma connection factory.

Dans le cadre où vous définissez un nombre maximum de 10 fois, vous devrez faire :

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">     
      <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=10" /> 
</bean>

Vous pouvez le redistribuer indéfiniment (jusqu’à expiration du message) en utilisant -1 pour maximumRedeliveries

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=-1" /> 
</bean>

Backoff ActiveMQ™

Nous venons d’appréhender le mécanisme de redistribution d’ActiveMQ qui est assez pratique. Afin d’optimiser le nombre de tentatives, nous pouvons introduire un certain délai entre chaque. Plus le nombre s’accroît, plus nous pouvons augmenter le délai : c’est ce qu’on appelle une backoff delivery.

Pour être configuré sur l’URL de la connection factory, il faut utiliser :

  • useExponentialBackOff permet un backoff exponentiel. Il est désactivé par défaut ;
  • backOffMultiplier : permet de choisir la façon d’augmenter le délai. Par défaut, le «nouveau» délai est 5 fois plus élevé que le «précédent» ;
  • initialRedeliveryDelay est le délai de delivery initial. C’est le temps de démarrage du backoff. Par défaut, c’est 1000L, soit 1 seconde ;
  • maximumRedeliveryDelay est le délai maximum que nous pouvons avoir. Par défaut, c’est 1, ce qui signifie qu’il n’y a pas maximum

Donc, nous mettrons à jour l’URL de la connection factory pour permettre un backoff delivery :

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=-1&amp;jms.redeliveryPolicy.useExponentialBackOff=true&amp;jms.redeliveryPolicy.initialRedeliveryDelay=2000L&amp;jms.redeliveryPolicy.backOffMultiplier=2" /> 
</bean>

Nous avons ici une politique de backoff delivery qui doublera le délai pour chaque tentative, à partir de 2 secondes. Grâce à cela, nous gardons une route Camel assez simple et bénéficions d’une redistribution correcte et exponentielle, avec gestion du backoff.

Bloqué dans vos roadmaps ?

Vous souhaitez former vos équipes ?

fr_FRFrançais
en_GBEnglish (UK) fr_FRFrançais