Il n’existe que peu de références pour mettre en place un cache Infinispan avec un ESB camel comme Talend ESB ou RedHat Fuse.
Après quelques travaux sur cette mise en oeuvre, je vous propose mon mode d’emploi. Ceci vous permettra de répartir des caches de données entre ces runtimes ESB.
Ce partage de collections peut être utilisé pour répondre à plusieurs besoins:
- Manipuler efficacement de grandes collections en mémoire en les chargeant que partiellement,
- Partager en temps-réel des données entre plusieurs processus,
- Distribuer des ressources mémoires en fonction de consommateurs multiples,
- Offrir une solution de Haute Disponibilité à moindre coût,
- Permettre des accès multiples à des ressources communes sans dé-duplication.
C’est ce dernier besoin qui m’a intéressé avec le besoin de permettre des consommateurs multiples de fichiers sFTP sans duplication.
Idempotent ? Quezako ?
Le pattern Idempotent est un EIP, Enterprise Integration Patterns. Il définit un concept d’accès unique et non simultané à une ressource.
Mon cas concerne la consommation de fichiers sFTP. Chaque serveur Karaf/camel porte les mêmes routes. Ces routes sont donc des concurrentes pour la consommation de leurs cibles sFTP.
Le pattern Idempotent permet de répondre à cette concurrence d’accès entre des consommateurs multiples sFTP.
Infinispan
Le cache Infinispan de RedHat est un DataGrid Java. C’est à dire que celui-ci permet de répartir, distribuer, rapprocher, stocker, interroger et modifier de grandes collections de données avec une performance accrue. La latence des DataGrid, comme Infinispan, GigaSpace ou encore Google Memory Store, est de quelques millisecondes. Ce sont des outils, avec un temps de réponse performant, sont parfaitement adaptés aux cas d’usages synchrones.
Par ailleurs, les architectures offertes par ces DataGrid sont très variées de part leur flexibilité.
Un exemple de caches multi-sites:
Mise en oeuvre Camel
Dans ma réalisation avec camel, l’architecture est simple. Seule 2 ou 4 instances de caches sont utilisées.
Context
La déclaration du contexte camel est différente suivant les outils. Ici avec Talend ESB, il convient d’utiliser un composant cConfig.
Déclaration des imports:
// Infinispan
import org.infinispan.manager.DefaultCacheManager;
import java.io.File;
// Idempotent Infinispan
import org.apache.camel.component.infinispan.processor.idempotent.InfinispanIdempotentRepository;
// Manipulation des registres
import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
import org.apache.camel.impl.SimpleRegistry;
Les beans Infinispan cacheManager, idemptent-ROUTENAME et inProgress-ROUTENAME sont créés et inscrits au registre. Ils seront consommés par le composant sftp.
Déclaration des beans Infinispan:
// Infinispan
String routeName = "ROUTENAME";
// Déjà créé ?
DefaultCacheManager cacheManager = (DefaultCacheManager)
camelContext
.getRegistry(SimpleRegistry.class)
.get("cacheManager");
// 1 instance cacheManager pour toutes les instances de route
if (cacheManager == null) {
// Lecture du fichier de configuration infinispan.xml
cacheManager = new DefaultCacheManager(System.getenv("KARAF_HOME")+"/lib/infinispan.xml");
// Le cacheManager est déclaré en tant que Bean
camelContext
.getRegistry(SimpleRegistry.class)
.put("cacheManager", cacheManager);
// Un repository pour le cache idempotent fichiers
InfinispanIdempotentRepository infinispanRepo = InfinispanIdempotentRepository.infinispanIdempotentRepository(cacheManager, "idempotent-"+routeName);
// Inscription en tant que Bean
camelContext
.getRegistry(SimpleRegistry.class)
.put("infinispanRepo", infinispanRepo);
// Un repository pour le cache inProgress
InfinispanIdempotentRepository inProgressMemIdemRepo = InfinispanIdempotentRepository.infinispanIdempotentRepository(cacheManager, "inProgress-"+routeName);
// Inscription en tant que Bean
camelContext
.getRegistry(SimpleRegistry.class)
.put("inProgressMemIdemRepo", inProgressMemIdemRepo);
}
EndPoint Infinispan
La mise en oeuvre de ce endpoint “fictif” n’est pas anecdotique. Elle permet l’import des bibliothèques OSGI dans Karaf. Ne l’oubliez-pas.
La configuration de ce endpoint est simple:
- la lecture des entrées du cache “temp” en utilisant le bean “cacheManager” (cf. ci-dessus):
infinispan:temp?cacheContainer=#cacheManager
Préciser la dépendance au composant infinispan (dans Talend ESB).
Composant sFTP
La lecture des fichiers est déclarée dans un endpoint classique SFTP mais avec les options suivantes:
- idempotent: activation du pattern idempotent
- idempotentRepository: Bean du “référentiel Idempotent”
- inProgressRepository: Bean du “référentiel Idempotent en cours”
- idempotentKey: la clé différenciante
- readLock: stratégie “idempotent”
La configuration ci-dessous est dynamique. Elle utilise des propriétés qui ont été externalisés, cf article Traitement JMS par lots.
.simple("sftp:${property.sFTPServer}:${property.sFTPPort}${property.sFTPBaseDirectory}?"
+"username=${property.sFTPUser}"
+"&password=${property.sFTPPassword}"
+"&Include=${property.sFTPFileName}"
+"&move=${property.sFTPArchiveDirectory}"
+"&moveFailed=${property.sFTPErrorDirectory}"
+"&autoCreate=false"
+"&throwExceptionOnConnectFailed=true"
+"&charset=${property.encoding}"
+"&idempotent=true"
+"&idempotentKey=${file:name}-${file:size}"
+"&idempotentRepository=#fileIdemRepo"
+"&inProgressRepository=#inProgressIdemRepo"
+"&readLock=idempotent"
+"${property.sFTPOptions}"
)
.timeout(200)
Configuration Infinispan
Configuration des caches
Les caches Infinispan sont préalablement déclarés dans un fichier externe en XML. Il sont déployé sur chaque serveur participant au cluster.
Des modèles de cache appelés “idempotent*” et “inProgress*” sont déclarés. Ainsi ils servent de référence pour tous les caches de route comme idempotent-ROUTENAME.
Les particularités de ces modèles sont les suivantes:
- ils sont distribués. Chacun porte la totalité du contenu.
- une persistance locale est activée.
- leurs données ont une durée de vie limitée.
Le fichier de configuration a été positionné en $KARAF_HOME/lib/infinispan.xml . Il est lu par le bean cacheManager.
<?xml version="1.0" encoding="UTF-8"?>
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:9.4 https://infinispan.org/schemas/infinispan-config-9.4.xsd"
xmlns="urn:infinispan:config:9.4">
<!-- lien vers le fichier JGroups des participants -->
<jgroups>
<stack-file name="external-file"
path="/opt/talend/runtime/lib/jgroups-tcp.xml"/>
</jgroups>
<cache-container default-cache="local" statistics="true">
<!-- global-state>
<persistent-location path="${java.io.tmpdir}" relative-to="my.data"/>
</global-state -->
<transport cluster="MFT" stack="external-file"/>
<!-- cache par defaut -->
<local-cache name="local">
<memory>
<object size="500" strategy="REMOVE"/>
</memory>
</local-cache>
<!-- cache pour les idempotent -->
<replicated-cache-configuration name="idempotent*" mode="SYNC">
<!-- expiration au bout de 7j = 86400 x 7 x1000 -->
<expiration lifespan="604800000"/>
<memory>
<!-- evication a 100 Mo -->
<binary size="100000000" eviction="MEMORY"/>
</memory>
<!-- persistence au dela de la memoire -->
<persistence passivation="false">
<file-store fetch-state="true"
read-only="false"
purge="false" path="${java.io.tmpdir}/idempotent"/>
</persistence>
<locking isolation="READ_COMMITTED"/>
<transaction locking="OPTIMISTIC"/>
</replicated-cache-configuration>
<!-- cache pour les idempotent inProgress -->
<replicated-cache-configuration name="inProgress*" mode="SYNC" >
<!-- expiration au bout de 1h 3600 x1000 -->
<expiration lifespan="3600000"/>
<!-- max de 5000 fichiers -->
<memory>
<object size="5000" strategy="REMOVE"/>
</memory>
</replicated-cache-configuration>
<!-- liste des caches des ROUTES -->
<replicated-cache name="inProgress-CACHE_NAME" />
<replicated-cache name="idempotent-CACHE_NAME" />
</cache-container>
</infinispan>
Configuration des nœuds du cluster Infinispan
La configuration des participants au cluster Infinispan est, lui aussi, externalisé (cf <jgroups> du infinispan.xml) . Chaque déploiement sera adapté à façon sans impacter la route.
Par ailleurs, dans ce fichier, c’est bien l’ IP des participants qu’il convient d’utiliser, pas leur nom DNS.
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups-4.0.xsd">
<TCP bind_addr="${jgroups.tcp.address:10.127.8.167}"
bind_port="${jgroups.tcp.port:7800}"
enable_diagnostics="true"
thread_naming_pattern="pl"
send_buf_size="640k"
sock_conn_timeout="300"
bundler_type="no-bundler"
thread_pool.min_threads="${jgroups.thread_pool.min_threads:1}"
thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
thread_pool.keep_alive_time="60000"
/>
<!-- 2 participants: 10.127.8.167 et 10.127.8.168 -->
<TCPPING
initial_hosts="10.127.8.167[7800],10.127.8.168[7800]"
port_range="3"
/>
<MERGE3 min_interval="10000"
max_interval="30000"
/>
<FD_SOCK />
<!-- Suspect node `timeout` to `timeout + timeout_check_interval` millis after the last heartbeat -->
<FD_ALL timeout="10000"
interval="2000"
timeout_check_interval="1000"
/>
<VERIFY_SUSPECT timeout="1000"/>
<pbcast.NAKACK2 use_mcast_xmit="false"
xmit_interval="100"
xmit_table_num_rows="50"
xmit_table_msgs_per_row="1024"
xmit_table_max_compaction_time="30000"
resend_last_seqno="true"
/>
<UNICAST3 xmit_interval="100"
xmit_table_num_rows="50"
xmit_table_msgs_per_row="1024"
xmit_table_max_compaction_time="30000"
/>
<pbcast.STABLE stability_delay="500"
desired_avg_gossip="5000"
max_bytes="1M"
/>
<pbcast.GMS print_local_addr="false"
join_timeout="${jgroups.join_timeout:5000}"
/>
<MFC max_credits="2m"
min_threshold="0.40"
/>
<FRAG3/>
</config>
Déploiement de la route
Installation des prérequis
La route est prête. Avant de la déployer, un module doit être préalablement installé dans le serveur Karaf: camel-infinispan
feature:install camel-infinispan
Activation des logs
Pour activer les logs Infinispan, préciser la configuration au niveau du fichier $TALEND_HOME/etc/org.ops4j.pax.logging.cfg :
log4j2.logger.INFINISPAN.name = org.infinispan
log4j2.logger.INFINISPAN.level = INFO
log4j2.logger.JGROUPS.name = org.jgroups
log4j2.logger.JGROUPS.level = INFO
Déploiement de la route
Au démarrage des routes, les participants Infinispan s’affichent.
Lors du démarrage du 2nd participant, son inscription au cluster est bien prise en compte.
Exemple: les serveurs serveur-karaf-1 et serveur-karaf-2 s’appellent se synchronisent bien:
11:20:31.395 INFO [jgroups-16784,serveur-karaf-2-12887] ISPN000094: Received new cluster view for channel MFT: [serveur-karaf-2-12887|43] (7) [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852]
11:20:31.396 INFO [jgroups-16784,serveur-karaf-2-12887] ISPN100000: Node serveur-karaf-2-55852 joined the cluster
11:20:31.414 INFO [jgroups-16404,serveur-karaf-2-30102] ISPN000094: Received new cluster view for channel MFT: [serveur-karaf-2-12887|43] (7) [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852]
11:20:31.415 INFO [jgroups-16404,serveur-karaf-2-30102] ISPN100000: Node serveur-karaf-2-55852 joined the cluster
11:20:31.416 INFO [jgroups-16482,serveur-karaf-2-1243] ISPN000094: Received new cluster view for channel MFT: [serveur-karaf-2-12887|43] (7) [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852]
11:20:31.416 INFO [jgroups-16482,serveur-karaf-2-1243] ISPN100000: Node serveur-karaf-2-55852 joined the cluster
11:20:31.430 INFO [pipe-bundle:start R_DOUBLERUN_FTP2FTP_infinispan] ISPN000094: Received new cluster view for channel MFT: [serveur-karaf-2-12887|43] (7) [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852]
11:20:31.535 INFO [pipe-bundle:start R_DOUBLERUN_FTP2FTP_infinispan] ISPN000079: Channel MFT local address is serveur-karaf-2-55852, physical addresses are [10.127.8.168:7803]
11:20:31.569 INFO [remote-thread--p2-t1394] [Context=org.infinispan.CONFIG] ISPN100002: Starting rebalance with members [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852], phase READ_OLD_WRITE_ALL, topology id 142
11:20:31.702 INFO [remote-thread--p2-t1394] [Context=org.infinispan.CONFIG] ISPN100009: Advancing to rebalance phase READ_ALL_WRITE_ALL, topology id 143
11:20:31.729 INFO [remote-thread--p2-t1395] [Context=org.infinispan.CONFIG] ISPN100009: Advancing to rebalance phase READ_NEW_WRITE_ALL, topology id 144
11:20:31.745 INFO [remote-thread--p2-t1395] [Context=org.infinispan.CONFIG] ISPN100010: Finished rebalance with members [serveur-karaf-2-12887, serveur-karaf-1-10641, serveur-karaf-2-30102, serveur-karaf-1-59276, serveur-karaf-2-1243, serveur-karaf-1-25505, serveur-karaf-2-55852], topology id 145
Maintenant les caches Infinispan sont prêts. La consommation des fichiers SFTP avec le pattern Idempotent sera bien répartie entre tous les participants.