Il n’existe que peu de références pour mettre en place un cache Infinispan avec un ESB camel comme Talend ESB ou RedHat Fuse.

Après quelques travaux sur cette mise en oeuvre, je vous propose mon mode d’emploi. Ceci vous permettra de répartir des caches de données entre ces runtimes ESB.

Architecture de caches dans Talend ESB

Ce partage de collections peut être utilisé pour répondre à plusieurs besoins:

  1. Manipuler efficacement de grandes collections en mémoire en les chargeant que partiellement,
  2. Partager en temps-réel des données entre plusieurs processus,
  3. Distribuer des ressources mémoires en fonction de consommateurs multiples,
  4. Offrir une solution de Haute Disponibilité à moindre coût,
  5. Permettre des accès multiples à des ressources communes sans dé-duplication.

C’est ce dernier besoin qui m’a intéressé avec le besoin de permettre des consommateurs multiples de fichiers sFTP sans duplication.

Idempotent ? Quezako ?

Le pattern Idempotent est un EIP, Enterprise Integration Patterns. Il définit un concept d’accès unique et non simultané à une ressource.

Mon cas concerne la consommation de fichiers sFTP. Chaque serveur Karaf/camel porte les mêmes routes. Ces routes sont donc des concurrentes pour la consommation de leurs cibles sFTP.

Accès aux fichiers sFTP

Le pattern Idempotent permet de répondre à cette concurrence d’accès entre des consommateurs multiples sFTP.

Infinispan

Le cache Infinispan de RedHat est un DataGrid Java. C’est à dire que celui-ci permet de répartir, distribuer, rapprocher, stocker, interroger et modifier de grandes collections de données avec une performance accrue. La latence des DataGrid, comme Infinispan, GigaSpace ou encore Google Memory Store, est de quelques millisecondes. Ce sont des outils, avec un temps de réponse performant, sont parfaitement adaptés aux cas d’usages synchrones.

Par ailleurs, les architectures offertes par ces DataGrid sont très variées de part leur flexibilité.

Un exemple de caches multi-sites:

xsite
Cross site replication
https://infinispan.org/docs/stable/titles/xsite/xsite.html

Mise en oeuvre Camel

Dans ma réalisation avec camel, l’architecture est simple. Seule 2 ou 4 instances de caches sont utilisées.

Context

La déclaration du contexte camel est différente suivant les outils. Ici avec Talend ESB, il convient d’utiliser un composant cConfig.

La surcharge du contexte

Déclaration des imports:

// Infinispan
import org.infinispan.manager.DefaultCacheManager;
import java.io.File;

// Idempotent Infinispan
import org.apache.camel.component.infinispan.processor.idempotent.InfinispanIdempotentRepository;

// Manipulation des registres
import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
import org.apache.camel.impl.SimpleRegistry;

Les beans Infinispan cacheManager, idemptent-ROUTENAME et inProgress-ROUTENAME sont créés et inscrits au registre. Ils seront consommés par le composant sftp.

Déclaration des beans Infinispan:

// Infinispan
String routeName = "ROUTENAME";

// Déjà créé ? 
DefaultCacheManager cacheManager = (DefaultCacheManager)
camelContext
	.getRegistry(SimpleRegistry.class)
	.get("cacheManager");

// 1 instance cacheManager pour toutes les instances de route
if (cacheManager == null) {	

  // Lecture du fichier de configuration infinispan.xml
  cacheManager = new  DefaultCacheManager(System.getenv("KARAF_HOME")+"/lib/infinispan.xml");

  // Le cacheManager est déclaré en tant que Bean
  camelContext
	.getRegistry(SimpleRegistry.class)
	.put("cacheManager", cacheManager);
	
  // Un repository pour le cache idempotent fichiers
  InfinispanIdempotentRepository infinispanRepo = InfinispanIdempotentRepository.infinispanIdempotentRepository(cacheManager, "idempotent-"+routeName);

  // Inscription en tant que Bean
  camelContext
	.getRegistry(SimpleRegistry.class)
	.put("infinispanRepo", infinispanRepo);

	
  // Un repository pour le cache inProgress	
  InfinispanIdempotentRepository inProgressMemIdemRepo = InfinispanIdempotentRepository.infinispanIdempotentRepository(cacheManager, "inProgress-"+routeName);

  // Inscription en tant que Bean
  camelContext
	.getRegistry(SimpleRegistry.class)
	.put("inProgressMemIdemRepo", inProgressMemIdemRepo);

}

EndPoint Infinispan

La mise en oeuvre de ce endpoint “fictif” n’est pas anecdotique. Elle permet l’import des bibliothèques OSGI dans Karaf. Ne l’oubliez-pas.

EndPoint Camel Infinispan

La configuration de ce endpoint est simple:

  • la lecture des entrées du cache “temp” en utilisant le bean “cacheManager” (cf. ci-dessus):
infinispan:temp?cacheContainer=#cacheManager

Préciser la dépendance au composant infinispan (dans Talend ESB).

Composant sFTP

La lecture des fichiers est déclarée dans un endpoint classique SFTP mais avec les options suivantes:

  • idempotent: activation du pattern idempotent
  • idempotentRepository: Bean du “référentiel Idempotent”
  • inProgressRepository: Bean du “référentiel Idempotent en cours”
  • idempotentKey: la clé différenciante
  • readLock: stratégie “idempotent”

La configuration ci-dessous est dynamique. Elle utilise des propriétés qui ont été externalisés, cf article Traitement JMS par lots.

.simple("sftp:${property.sFTPServer}:${property.sFTPPort}${property.sFTPBaseDirectory}?"
    +"username=${property.sFTPUser}"
    +"&password=${property.sFTPPassword}"
    +"&Include=${property.sFTPFileName}"
    +"&move=${property.sFTPArchiveDirectory}"
    +"&moveFailed=${property.sFTPErrorDirectory}"
    +"&autoCreate=false"
    +"&throwExceptionOnConnectFailed=true"
    +"&charset=${property.encoding}"   
    +"&idempotent=true"
    +"&idempotentKey=${file:name}-${file:size}" 
    +"&idempotentRepository=#fileIdemRepo" 
    +"&inProgressRepository=#inProgressIdemRepo" 
    +"&readLock=idempotent"
    +"${property.sFTPOptions}"
)
	.timeout(200)
	

Configuration Infinispan

Configuration des caches

Les caches Infinispan sont préalablement déclarés dans un fichier externe en XML. Il sont déployé sur chaque serveur participant au cluster.

Des modèles de cache appelés “idempotent*” et “inProgress*” sont déclarés. Ainsi ils servent de référence pour tous les caches de route comme idempotent-ROUTENAME.

Les particularités de ces modèles sont les suivantes:

  • ils sont distribués. Chacun porte la totalité du contenu.
  • une persistance locale est activée.
  • leurs données ont une durée de vie limitée.
Externalisation des configurations des caches

Le fichier de configuration a été positionné en $KARAF_HOME/lib/infinispan.xml . Il est lu par le bean cacheManager.

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="urn:infinispan:config:9.4 https://infinispan.org/schemas/infinispan-config-9.4.xsd"
      xmlns="urn:infinispan:config:9.4">
     
      <!-- lien vers le fichier JGroups des participants --> 
      <jgroups>
         <stack-file name="external-file" 
			path="/opt/talend/runtime/lib/jgroups-tcp.xml"/>
      </jgroups>

      <cache-container default-cache="local" statistics="true">
	  
		<!-- global-state>
		   <persistent-location path="${java.io.tmpdir}" relative-to="my.data"/>
		</global-state -->
		
		<transport cluster="MFT" stack="external-file"/>
		  
		<!-- cache par defaut -->
		<local-cache name="local">
			<memory>
				<object size="500" strategy="REMOVE"/>
			</memory>
		</local-cache>
		
		<!-- cache pour les idempotent -->
		<replicated-cache-configuration name="idempotent*" mode="SYNC">
		
			<!-- expiration au bout de 7j = 86400 x 7 x1000 -->
			<expiration lifespan="604800000"/>
			
			<memory>
				<!-- evication a 100 Mo -->
				<binary size="100000000" eviction="MEMORY"/>
			</memory>
			
			<!-- persistence au dela de la memoire -->
			<persistence passivation="false">
                           <file-store fetch-state="true"
                                        read-only="false"
                                        purge="false" path="${java.io.tmpdir}/idempotent"/>
                        </persistence>

		    <locking isolation="READ_COMMITTED"/>

                    <transaction locking="OPTIMISTIC"/>

		</replicated-cache-configuration>
		
		<!-- cache pour les idempotent inProgress -->
		<replicated-cache-configuration name="inProgress*" mode="SYNC" >
			<!-- expiration au bout de 1h 3600 x1000 -->
			<expiration lifespan="3600000"/>
			
			<!-- max de 5000 fichiers -->
			<memory>
				<object size="5000" strategy="REMOVE"/>
			</memory>
		</replicated-cache-configuration>
		
                <!-- liste des caches des ROUTES -->
		<replicated-cache name="inProgress-CACHE_NAME" />
		<replicated-cache name="idempotent-CACHE_NAME" />
				
      </cache-container>
</infinispan>

Configuration des nœuds du cluster Infinispan

La configuration des participants au cluster Infinispan est, lui aussi, externalisé (cf <jgroups> du infinispan.xml) . Chaque déploiement sera adapté à façon sans impacter la route.

Par ailleurs, dans ce fichier, c’est bien l’ IP des participants qu’il convient d’utiliser, pas leur nom DNS.

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups-4.0.xsd">

   <TCP bind_addr="${jgroups.tcp.address:10.127.8.167}"
        bind_port="${jgroups.tcp.port:7800}"
        enable_diagnostics="true"
        thread_naming_pattern="pl"
        send_buf_size="640k"
        sock_conn_timeout="300"
        bundler_type="no-bundler"

        thread_pool.min_threads="${jgroups.thread_pool.min_threads:1}"
        thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
        thread_pool.keep_alive_time="60000"
   />
   
  <!-- 2 participants: 10.127.8.167 et 10.127.8.168 -->
  <TCPPING 
        initial_hosts="10.127.8.167[7800],10.127.8.168[7800]"
        port_range="3"
	/>

   <MERGE3 min_interval="10000" 
           max_interval="30000" 
   />
   <FD_SOCK />
   <!-- Suspect node `timeout` to `timeout + timeout_check_interval` millis after the last heartbeat -->
   <FD_ALL timeout="10000"
           interval="2000"
           timeout_check_interval="1000"
   />
   <VERIFY_SUSPECT timeout="1000"/>
   <pbcast.NAKACK2 use_mcast_xmit="false"
                   xmit_interval="100"
                   xmit_table_num_rows="50"
                   xmit_table_msgs_per_row="1024"
                   xmit_table_max_compaction_time="30000"
                   resend_last_seqno="true"
   />
   <UNICAST3 xmit_interval="100"
             xmit_table_num_rows="50"
             xmit_table_msgs_per_row="1024"
             xmit_table_max_compaction_time="30000"
   />
   <pbcast.STABLE stability_delay="500"
                  desired_avg_gossip="5000"
                  max_bytes="1M"
   />
   <pbcast.GMS print_local_addr="false"
               join_timeout="${jgroups.join_timeout:5000}"
   />
   <MFC max_credits="2m" 
        min_threshold="0.40"
   />
   <FRAG3/>
</config>

Déploiement de la route

Installation des prérequis

La route est prête. Avant de la déployer, un module doit être préalablement installé dans le serveur Karaf: camel-infinispan

feature:install camel-infinispan

Activation des logs

Pour activer les logs Infinispan, préciser la configuration au niveau du fichier $TALEND_HOME/etc/org.ops4j.pax.logging.cfg :

log4j2.logger.INFINISPAN.name = org.infinispan
log4j2.logger.INFINISPAN.level = INFO

log4j2.logger.JGROUPS.name = org.jgroups
log4j2.logger.JGROUPS.level = INFO

Déploiement de la route

Au démarrage des routes, les participants Infinispan s’affichent.

Lors du démarrage du 2nd participant, son inscription au cluster est bien prise en compte.

Exemple: les serveurs serveur-karaf-1 et serveur-karaf-2 s’appellent se synchronisent bien:

11:20:31.395 INFO [jgroups-16784,serveur-karaf-2-12887] ISPN000094: Received new cluster view for channel MFT: [serveur-karaf-2-12887|43] (7) [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852]
11:20:31.396 INFO [jgroups-16784,serveur-karaf-2-12887] ISPN100000: Node serveur-karaf-2-55852 joined the cluster
11:20:31.414 INFO [jgroups-16404,serveur-karaf-2-30102] ISPN000094: Received new cluster view for channel MFT: [serveur-karaf-2-12887|43] (7) [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852]
11:20:31.415 INFO [jgroups-16404,serveur-karaf-2-30102] ISPN100000: Node serveur-karaf-2-55852 joined the cluster
11:20:31.416 INFO [jgroups-16482,serveur-karaf-2-1243] ISPN000094: Received new cluster view for channel MFT: [serveur-karaf-2-12887|43] (7) [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852]
11:20:31.416 INFO [jgroups-16482,serveur-karaf-2-1243] ISPN100000: Node serveur-karaf-2-55852 joined the cluster
11:20:31.430 INFO [pipe-bundle:start R_DOUBLERUN_FTP2FTP_infinispan] ISPN000094: Received new cluster view for channel MFT: [serveur-karaf-2-12887|43] (7) [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852]
11:20:31.535 INFO [pipe-bundle:start R_DOUBLERUN_FTP2FTP_infinispan] ISPN000079: Channel MFT local address is serveur-karaf-2-55852, physical addresses are [10.127.8.168:7803]
11:20:31.569 INFO [remote-thread--p2-t1394] [Context=org.infinispan.CONFIG] ISPN100002: Starting rebalance with members [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852], phase READ_OLD_WRITE_ALL, topology id 142
11:20:31.702 INFO [remote-thread--p2-t1394] [Context=org.infinispan.CONFIG] ISPN100009: Advancing to rebalance phase READ_ALL_WRITE_ALL, topology id 143
11:20:31.729 INFO [remote-thread--p2-t1395] [Context=org.infinispan.CONFIG] ISPN100009: Advancing to rebalance phase READ_NEW_WRITE_ALL, topology id 144
11:20:31.745 INFO [remote-thread--p2-t1395] [Context=org.infinispan.CONFIG] ISPN100010: Finished rebalance with members [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852], topology id 145

Maintenant les caches Infinispan sont prêts. La consommation des fichiers SFTP avec le pattern Idempotent sera bien répartie entre tous les participants.

Caches Infinispan sur Talend ESB

Vous pourrez aussi aimer