Traitement JMS par lots avec Talend ESB

La gestion des messages dans des files JMS est caractérisée par un élément important, 1 contenu pour 1 message dans 1 file. Cela permet aux consommateurs de choisir avec finesse les contenus manipulés. Cela offre aussi des solutions de routage, de transformation, et de rejeux simples pour chaque message. Ceci de façon maîtrisée et transactionnelle. En résumé, dans les architectures orientées messages, l’adhérence faible entre consommateur et producteur de contenu est un point fort indéniable.

L’inconvénient est que les traitements unitaires sont coûteux en ressources. Lors de fortes quantités de messages, il est toutefois préférable de lotir ces traitements. Cette technique est d’ailleurs empruntée aux ETL.

Une de mes dernières réalisations a été de réaliser, sur la plateforme Talend ESB, des envois par lots à elasticsearch. Les clients JMS classiques étaient bien trop lents et très consommateurs de CPU. Plusieurs serveurs étaient nécessaires à ces envois. Ce client JMS par lots a définitivement supprimé cette forte contention et réduit nos charges CPU.

Préparation du serveur Karaf

Notre route nécessite l’utilisation des composants quartz et simple jms batch. Le composant sjms est celui qui offre la consommation multiple de messages JMS. Le composant Quartz supporte le déclenchement de cette consommation.

Pour les installer dans le runtime Talend ESB, ie le serveur Karaf, exécuter les commandes suivantes:

%KARAF_HOME/bin/client

feature:install -r camel-quartz
feature:install -r camel-sjms

Par ailleurs, les consommateurs et producteurs JMS utiliseront un pool de connexion JMS. Il est donc déclaré et déposé directement sur le serveur Karaf dans le fichier %KARAF_HOME/deploy/TechBatchConnectionFactory.xml. La configuration ActiveMQ est alors indépendante des routes, externalisée et mise en commun:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

  <bean id="activemq_TechBatch_ConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="{{activemq_tech_uri}}" />

    <property name="userName" value="{{activemq_tech_user}}" />
    <property name="password" value="{{activemq_tech_pwd}}" />
  </bean>

  <bean id="activemq_TechBatch_pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
    <property name="maxConnections" value="50" />
    <property name="connectionFactory" ref="activemq_TechBatch_ConnectionFactory" />

    <property name="createConnectionOnStartup" value="true"/>
    <property name="reconnectOnException" value="true" />

  </bean>

  <service interface="javax.jms.ConnectionFactory" ref="activemq_TechBatch_pooledConnectionFactory">
    <service-properties>
      <entry key="osgi.jndi.service.name" value="jms/TechBatchConnectionFactory"/>
    </service-properties>
  </service>

</blueprint>  

Le serveur prêt, la route Camel peut être développée.

Design de la route Camel

Cette route Camel est organisée en 4 parties distinctes:

  • une interception unitaire de chaque message XML avec son adaptation au format de sortie,
  • un routage unitaire vers les sorties (ici 3 indexes elasticsearch) au format JSON,
  • un traitement par lots des messages vers chaque index elasticsearch.
  • une interception commune des erreurs.

Dans cet article, seule la partie n°3, les traitements par lots, est détaillée.

Un alias de composant sjmsBatchTech est déclaré. Il fait appel au pool JMS préalable (jms/TechBatchConnectionFactory). Celui-ci peut aussi être intégré ou externalisé à la route. Dans ce cas, il est intégré:

<?xml version="1.0" encoding="UTF-8"?>
<!--Used to inject external resources, beans or define more CamelContext and RouteBuilder here-->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
 
    <-- utilise le Pool ActiveMQ déclaré en service JNDI --> 
    <bean id="cfBatch" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="osgi:service/jms/TechBatchConnectionFactory" />
    </bean>

    <bean id="sjmsBatchTech" class="org.apache.camel.component.sjms.batch.SjmsBatchComponent">
        <property name="connectionFactory" ref="cfBatch"/>
    </bean>

    <!-- Stratégie d'agrégation -->
    <bean id="listOfExchange" class="org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy"/>

</beans>

Commençons la description des composants camel mis en jeu.

Le composant d’entrée des traitements par lot (cMessagingEndpoint6) est configuré avec:

  • un lot de 20 messages maximum,
  • une durée d’attente maximale de 15s,
  • un nombre de consommateur de 1,
  • la mise en oeuvre de la stratégie d’agrégation listOfExchange.
sjmsBatchTech:trace.brute?completionSize=20&completionTimeout=15000&consumerCount=1&aggregationStrategy=#listOfExchange

Chaque message porte alors n contenus JSON. La route prépare l’appel HTTP à elasticsearch avec des ajouts d’header dans cSetHeader_1 tels que:

Un cProcessor (cProcessor_4) récupère ensuite les messages JSON agrégés pour en faire un contenu compréhensible par l’API _bulk d’elasticsearch.

String aggregateBody = "";

// Aggregation de messages ?
if (exchange.getProperty(Exchange.GROUPED_EXCHANGE) != null) {
	List<Exchange> exs = (List<Exchange>)exchange.getProperty(Exchange.GROUPED_EXCHANGE);
	
	exchange.setProperty("nbTraces", exs.size());
	
	// Les bodys sont concaténés
	for (Exchange ex: (List<Exchange>)exchange.getProperty(Exchange.GROUPED_EXCHANGE)) {
		aggregateBody += "{ \"index\": {} }\n";
		aggregateBody += ex.getIn().getBody().toString().replaceAll("\n", "")+"\n";
	}
}

exchange.getIn().setBody(aggregateBody);

	

Ainsi les messages respectent le format imposé par cette API _bluk. Exemple:

POST /monindex/_bulk
{ "index" : { } }
{ "field1" : "value1" }
{ "index" : { } }
{ "field1" : "value1" }

Enfin, l’appel est réalisé par un cJavaDSLProcessor ( cJavaDSLProcessor_1 ). L’URL est variabilisée dans un fichier de propriétés (variable trace.elastic.url):

.toD("{{trace.elastic.uri}}")

Synthèse

L’utilisation du composant sJMS batch a été extrêmement bénéfique pour traiter la forte volumétrie de messages JMS. Elle nous a permis d’économiser plusieurs CPU, à la source sur nos serveurs Talend ESB, ainsi qu’à la cible sur les nœuds Elasticsearch.

Code Camel complet

Pour un soucis de compréhension globale, je vous propose le code camel complet de la partie portant sur le traitement par lot.

from("sjmsBatchTech:trace.brute?completionSize=20&completionTimeout=15000&consumerCount=1&aggregationStrategy=#listOfExchange")
				.routeId("ConsumeTrace_cMessagingEndpoint_6")
				.setHeader(Exchange.HTTP_METHOD)
				.constant("POST")
				.setHeader(Exchange.HTTP_PATH)
				.simple("/trace-business-${date:now:yyyy.MM.dd}/business/_bulk")
				.setHeader(Exchange.CONTENT_TYPE)
				.constant("application/json; charset=UTF-8")
				.setHeader(Exchange.HTTP_URI)
				.simple("{{trace.elastic.uri}}")
				.id("ConsumeTrace_cSetHeader_1")
				.process(new org.apache.camel.Processor() {
					public void process(org.apache.camel.Exchange exchange)
							throws Exception {
						/*
						 * Provide own codes to consume or translate the message
						 * exchanges.
						 * 
						 * @param org.apache.camel.Exchange exchange
						 */
						String aggregateBody = "";

						// Aggregation de messages ?
						if (exchange.getProperty(Exchange.GROUPED_EXCHANGE) != null) {
							List<Exchange> exs = (List<Exchange>) exchange
									.getProperty(Exchange.GROUPED_EXCHANGE);

							exchange.setProperty("nbTraces", exs.size());

							// Les bodys sont concaténés
							for (Exchange ex : (List<Exchange>) exchange
									.getProperty(Exchange.GROUPED_EXCHANGE)) {
								aggregateBody += "{ \"index\": {} }\n";
								aggregateBody += ex.getIn().getBody()
										.toString().replaceAll("\n", "")
										+ "\n";
							}
						}

						exchange.getIn().setBody(aggregateBody);

					}

				})
				.id("ConsumeTrace_cProcessor_4")
				.log(org.apache.camel.LoggingLevel.INFO, context.routeName,
						"Vers Elasticsearch trace-business (${property.nbTraces}): ${body}")

				.id("ConsumeTrace_cLog_1")

				.toD("{{trace.elastic.uri}}")
				.id("ConsumeTrace_cJavaDSLProcessor_1");

Laisser un commentaire