Accueil / Blog / Métier / 2014 / Tutorial Talend : décomposer le contenu d'un fichier/flux vers plusieurs fichiers

Tutorial Talend : décomposer le contenu d'un fichier/flux vers plusieurs fichiers

Par Gaël Pegliasco publié 10/10/2014, édité le 27/01/2016
Une question m'a été posée récemment lors de l'une de nos formations Talend Open Studio for Data Integration : Comment décomposer/splitter le contenu d'un gros fichier en plusieurs sous-fichiers dont les enregistrements seront ventilés selon certains critères. Nous vous proposons dans ce tutoriel 3 approches différentes pour réaliser ce travail.
Tutorial Talend : décomposer le contenu d'un fichier/flux vers plusieurs fichiers

Description du problème

Nous possédons un fichier de clients contenant des millions d'enregistrements. Ce fichier est trop volumineux pour être traité d'un seul bloc et nous souhaitons le décomposer en plusieurs fichiers plus petits, en ventilant les clients selon un critère bien précis.

Notre fichier exemple est un fichier CSV issu des tutoriaux Talend (cliquez pour le télécharger).

Il contient plusieurs colonnes, dont :

  • id : identifiant unique du client

  • CustomerName : nom du client

  • CustomerAddress : adresse du client

  • idState : état/département dans lequel réside le client

Nous souhaitons décomposer ce fichier source sur le champ idState du client, pour générer un fichier destination par état, comme suit :

Fichier source

id;CustomerName;CustomerAddress;idState
1;Durand;Paris;75
2;Dupond;Nantes;44
3;Martin;Toulouse ; 31
4;Simon;Rezé;44

Et en sortie obtenir 3 fichiers :

Clients_75.csv

id;CustomerName;CustomerAddress;idState
1;Durand;Paris;75

Clients_44.csv

id;CustomerName;CustomerAddress;idState
2;Dupond;Nantes;44
4;Simon;Rezé;44

Clients_31.csv

id;CustomerName;CustomerAddress;idState
3;Martin;Toulouse ; 31

Trouver le bon composant

Nous cherchons un composant qui fonctionne un peu comme le tUnite, mais dans le sens contraire. Le tMap permet de décomposer un flux en plusieurs flux sortants avec un critère/filtre différent pour chaque flux sortant. Mais cela impose que le nombre de flux sortants soit connu d'avance.

Il ne peut pas être fourni par une règle. Dans le cas de notre décomposition sur le champ idState, cela n'est pas gérable, il y a 50 états américains, nous n'allons pas créer 50 flux sortants dans notre tMap.

N'ayant pas trouvé de composant tout fait parmi ceux de TOS, je me suis orienté vers la forge.

2 composants semblent pouvoir répondre au besoin :

Malheureusement leur fonctionnement ne semble plus compatible avec la version 5 de Talend.

C'est donc l'occasion de jouer un peu avec les composants actuels pour proposer, dans un premier temps, 3 solutions à ce problème, apparemment tout simple.

Solution 1

La première solution consiste à :

  • Récupérer toutes les valeurs de notre colonne « pivot » choisie pour la décomposition du fichier

  • Puis, pour chaque valeur de cette colonne, extraire les enregistrements correspondants à la valeur courante et les enregistrer dans un nouveau fichier.

Reliez les composants suivants dans cet ordre :

  1. tFileInputDelimited

  2. tUniqRow

  3. tFlowToIterate

  4. tFileInputDelimited

  5. tFilterRow

  6. tFileOutputDelimited

Reliez tous les composants par un lien « Main », excepté le lien entre le tFlowToIterate et le second tFileInputDelimited :

manyfile_screen1.png
La première partie du job, avant la connexion « iterate », consiste à filtrer les enregistrements sur la colonne idState en sélectionnant les valeurs uniques.

Le composant tFlowToIterate va enregistrer le numéro courant de l'état dans la variable globalMap.

La seconde partie qui est déclenchée à chaque itération sur les états relie le fichier source et filtre les enregistrements de l'état courant pour les enregistrer dans le fichier client.

Voici la configuration des différents composants :

manyfile_screen2.png

manyfile_screen3.png

Le second composant tFileInputDelimited possède la même configuration que le premier.

manyfile_screen4.png

manyfile_screen5.png

manyfile_screen6.png

La valeur de la colonne « Valeur » du champ tFilterRow est celle de l'état stocké dans le globalMap :

(Integer)globalMap.get("CURRENT_STATE")

Enfin, le composant tFileOutputDelimited pointe vers un fichier dont le nom est défini par celui de l'état courant :

manyfile_screen5b.png

Vous pouvez maintenant exécuter le job, voici les temps sur mon poste, issus des logs talend :

  • 2 500 ms avec un lien iterate configuré en parallèle

  • 3 000 ms sans l'option de parallélisation sur le lien iterate

Pour un fichier de 5 000 enregistrements, cela reste rapide et tout à fait correct. Mais le problème de ce job est qu'il relie le fichier source pour chaque valeur de la clef de pivot "idState", soit 50 fois dans notre exemple. Pour un fichier contenant des millions d'enregistrements, cela n'est pas réalisable.

Solution 2

La seconde solution permet de ne pas relire le fichier source pour chacune des valeurs de la colonne pivot en stockant ces données dans un cache.

Pour cela nous allons repartir du job précédent en ajoutant 2 composants de cache : tHashOutput et tHashInput dans le flux :

  • Glissez le premier composant tHashOutput juste après le premier tFileInputDelimited,
  • Remplacez ensuite le second tFileInputDelimited par le composant tHashInput.

 

manyfile_screen7.png

Dans les dernières versions de Talend Open Studio, ces composants peuvent ne pas apparaître dans la palette, dans ce cas, éditez les propriétés du projet, sélectionnez l'onglet « designer », et transférez les composants dans la partie droite :

manyfile_screen8.png

manyfile_screen9.png

manyfile_screen10.png

Vous pouvez maintenant exécuter le job.

Malheureusement cela ne fonctionne pas comme attendu : les fichiers générés ne contiennent qu'un seul enregistrement.

Que se passe-t-il ?

Le composant tHashInput ne restitue pas tous les enregistrements du fichier CSV. Il est appelé dès que le premier enregistrement est lu, de ce fait, le composant tHashOutput n'a pas encore sauvegardé tous les enregistrements en mémoire, aussi seul le premier de chaque état est restitué après le lien iterate.

Il faut donc couper notre job en 2 sous-jobs. L'un mémorisant les différents états et l'ensemble des enregistrements, l'autre parcourant l'ensemble des clients pour chaque état mémorisé.

manyfile_screen11.png

Pour ce faire, ré-organisez le job comme indiqué ci-dessus :

  • tHashOuput_1 reste inchangé et mémorise la liste des clients

  • tHashOutput_2 mémorise la liste des différents états

  • tHashInput_1 restitue maintenant la liste des états, et non plus celle des clients

  • tHashInput_2 restitue l'ensemble des clients

manyfile_screen12.png

manyfile_screen13.png

Cette fois le job réalise bien le travail souhaité.

Le temps d'exécution affiché dans les logs Talend est de 2 100ms.

Sur de petites volumétries cela n'est pas très parlant, mais cette fois le fichier n'est lu qu'une seule fois, au détriment de la complexité du job.

Un autre reproche qui peut être formulé à ce job est qu'avec des millions d'enregistrements nous risquons d'avoir des problèmes de mémoire qui peuvent eux aussi être bloquants avec cette solution.

Ce qui nous amène à la troisième solution.

Solution 3

La troisième solution est de loin la plus performante :

  • Elle ne lit qu'une seule fois le fichier source

  • Elle n'utilise pas de cache mémoire

  • Elle utilise seulement 2 composants

manyfile_screen14.png

Dans cette seconde partie l'écriture des enregistrements est réalisée en Java :

  • Si l'état de l'enregistrement courant n'a pas encore été traité, un fichier est créé sur le disque pour cet état et stocké dans la variable globalMap

  • Pour chaque enregistrement, on recherche dans la variable globalMap le fichier cible.
    Puis l'enregistrement courant est écrit dans ce dernier fichier.

  • En fin de traitement l'ensemble des fichiers ouverts est parcouru puis chaque fichier est fermé

manyfile_screen15.png

Code initial

// Mémorisation des fichier ouverts dans lfw
List<FileWriter> lfw =
new ArrayList<FileWriter>();

Code principal

// Nom du fichier courant
String filename =
"/tmp/customers_state_" + client.idState + ".csv";
// A-t-il déjà été créé ?
if
( ! globalMap.containsKey(filename) ) {
//
Création du fichier, si non trouvé dans globalMap
File file =
new File(filename);
file.createNewFile();
FileWriter fw =
new FileWriter(filename,true);
globalMap.put(filename, fw);
lfw.add(fw);
}
//
Récupération du fichier et enregistrement du client
FileWriter fw = (FileWriter)globalMap.get(filename);
fw.write(client.id +
";" + client.CustomerName + "\n");

 Code final

// Fermeture du fichiers
for
(int i = 0; i < lfw.size(); i++) {
lfw.get(i).close();
}

 manyfile_screen16.png

Imports des paramètres avancés

import java.io.File;
import
java.io.FileWriter;
import
java.util.ArrayList;

Temps d'exécution : 724ms.

Le gain est important. Certes toutes les colonnes ne sont pas restituées dans cet exemple, mais cela fonctionne beaucoup plus efficacement.

Solution 4

La solution 3 est intéressante. Pourquoi ne pas imaginer de créer un composant similaire au tFileOuputDelimited mais qui évalue le nom du fichier sortant à chaque nouvel enregistrement et crée un nouveau fichier si celui-ci est différent ?

Ceci serait vraiment pertinent, répondrait à notre besoin et éviterait d'écrire tout ce code Java.

Ce sera l'occasion d'un nouveau tutorial...


Pour en savoir plus...

Vous avez aimé cet exercice ? Alors, venez suivre nos formations Talend !...





ABONNEZ-VOUS À LA NEWSLETTER !
Voir aussi
Makina Corpus devient partenaire Gold de Talend Makina Corpus devient partenaire Gold de Talend 12/05/2009

Talend Tutoriel : comprendre les connexions iterate 20/12/2013

Le lien iterate est décrit assez succinctement dans le manuel utilisateur de Talend (User Guide). ...

Préparez votre agenda de formation pour la rentrée ! Préparez votre agenda de formation pour la rentrée ! 21/12/2015

Drupal8, PostgreSQL, Python scientifique... Découvrez nos nouvelles formations et les dernières ...

Geocoder avec Talend Open Studio 22/05/2014

Mettre en place un job de geocodage d'adresses (depuis un fichier XLS) dans l'ETL Talend Open ...

Makina Corpus participe à la conférence mondiale sur OpenStreetMap Makina Corpus participe à la conférence mondiale sur OpenStreetMap 09/09/2019

Du 21 au 23 septembre se tient à Heidelberg, en Allemagne, la douzième conférence annuelle sur ...