Utilisation de la parallélisation lors du traitement des données en C #



Bonne journée à tous! Je suis un spécialiste technique travaillant dans un système d'audit interne, mes responsabilités incluent la création d'outils ETL dans le langage de programmation C #.

Périodiquement, les sources de données sont des fichiers étroitement structurés au format xml, csv, json ou tout autre format. Parfois, leur nombre devient assez important et augmente constamment. Par exemple, dans l'une de mes tâches, le nombre de fichiers a augmenté avec un taux de rafraîchissement moyen d'environ 150 000 fichiers par jour. Si le traitement simultané d'un seul fichier (lecture d'un tableau d'octets du disque dur en mémoire, transformation des données téléchargées et écriture dans la base de données) prend une seconde, alors il devient clair que le traitement de tous les fichiers prendra plus de 40 heures. Dans ce cas, nous ne pourrons pas traiter ces fichiers jusqu'au bout, car la vitesse d'augmentation du nombre de fichiers sera nettement supérieure à la vitesse de leur traitement.

Une solution à ce problème consiste à développer une application dans laquelle créer un pool de threads indépendants les uns des autres. Les threads traiteront les fichiers sélectionnés dans la file d'attente générale. Cependant, dans ce cas, des difficultés surviennent avec la synchronisation des flux de travail et le partage des ressources, car des verrous mutuels sont très probables.

Pour éviter ces difficultés, Microsoft a ajouté la bibliothèque TPL au framework .Net (à partir de la version 4.0). Je vais vous expliquer comment utiliser cette bibliothèque pour résoudre ce problème.

Ainsi, initialement, l'algorithme d'opération se présente comme suit: Un

répertoire de stockage de fichiers est analysé et une liste (par exemple, Liste) contenant des données sur tous les fichiers est renvoyée;
Un cycle démarre (pour ou pour chaque) dans lequel les données du fichier suivant sont lues en mémoire, si nécessaire, transformées et écrites dans la base de données.

De toute évidence, les opérations les plus chronophages sont la lecture des données du disque dur dans la mémoire et l'écriture des données de la mémoire dans la base de données.

Essayons d'optimiser notre algorithme à l'aide de la bibliothèque TPL:

Étape 1.

Modifiez la liste renvoyée en scannant le répertoire de stockage de fichiers de List à ConcurrentQueue.
Pourquoi fait-on ça? Le fait est que la classe ConcurrentQueue est thread-safe, c'est-à-dire que si en même temps deux threads essaient d'extraire des données de cette liste ou d'y écrire des données, alors nous ne lèverons pas d'exceptions (Exception).
Le point 1 de notre algorithme ressemblera à ceci: le répertoire de stockage de fichiers est analysé et la liste ConcurrentQueue est retournée contenant des données sur tous les fichiers.

Point 2:
Modifions la conception en formant un cycle de traitement des données à partir d'un fichier. Remplacez par Parallel.For ou Parallel.ForEach.

Quelle est la différence entre la nouvelle construction et pour? Tout est simple et fondamentalement clair à partir du nom de la construction du langage. Toutes les itérations de la boucle sont effectuées dans des threads parallèles. À titre d'exemple, je vais montrer l'organisation de la boucle avec la construction Parallel.ForEach:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

où:

listFiles est une collection de type ConcurrentQueue contenant une liste de fichiers dans le répertoire;
currentFile - un élément de la collection listFiles, qui est retourné par la construction ForEach;
dataFile - une structure de données conditionnelle en mémoire, obtenue en lisant le contenu du fichier dans la mémoire;
getDataFile - une fonction conditionnelle qui renvoie le contenu d'un fichier sous la forme d'une structure de données;
TransformData - procédure conditionnelle de transformation des données reçues;
WriteToDB est une procédure conditionnelle pour écrire des données dans la base de données.

Dans cet exemple, en utilisant la construction Parallel.ForEach, nous organiserons la boucle. Dans ce cycle, en flux parallèles, les données sont lues à partir du disque dur, leur transformation et leur écriture dans la base de données. Dans le même temps, l'organisation du travail des flux parallèles ne pose aucun problème. Le nombre de threads parallèles dépend du nombre de cœurs de processeur et de leur charge de travail.

En utilisant l'algorithme proposé, nous accélérerons le traitement des fichiers d'au moins 2 fois. Bien que, bien sûr, ce chiffre varie en fonction du nombre de cœurs et de la mémoire de la machine sur laquelle le programme s'exécutera.

De plus, pour accélérer le programme, vous devez placer l'enregistrement dans la base de données dans un flux distinct qui fonctionne quel que soit le principal. Cela peut être fait à l'aide de la collection ConcurrentQueue pour éviter les conflits lors de l'ajout de données à la file d'attente.

Nous réécrivons l'exemple ci-dessus, en tenant compte de l'optimisation de l'écriture dans la base de données.
Supposons qu'un lecteur de fichiers nous renvoie des données dans un DataTable):

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

Comme vous pouvez le voir, au lieu d'une ligne avec un appel à la procédure d'écriture dans la base de données, nous ajoutons simplement à la collection ConcurrentQueue ListData décrite et initialisée dans un thread séparé, une instance dont threadWriteToDB est utilisé dans notre boucle.

L'écriture dans la base de données est déjà dans un flux distinct. L'écriture dans la base de données peut être organisée de la même manière que l'utilisation de fichiers à l'aide des constructions Parallel.For et / ou Paral-lel.Foreach.

Dans ma tâche, où le traitement d'un nombre comparable de fichiers était requis, maintenant, en moyenne, de 200 000 à 400 000 fichiers peuvent être traités par jour, et la vitesse est limitée par le chargement de la base de données et la largeur du canal de données.

All Articles