Verwenden der Parallelisierung bei der Verarbeitung von Daten in C #



Guten Tag allerseits! Ich bin ein technischer Spezialist, der in einem internen Audit-System arbeitet. Zu meinen Aufgaben gehört die Erstellung von ETL-Tools in der Programmiersprache C #.

In regelmäßigen Abständen sind Datenquellen eng strukturierte Dateien in XML, CSV, JSON oder einem anderen Format. Manchmal wird ihre Anzahl ziemlich groß und nimmt ständig zu. Beispielsweise stieg bei einer meiner Aufgaben die Anzahl der Dateien mit einer durchschnittlichen Aktualisierungsrate von ungefähr 150.000 Dateien pro Tag. Wenn die Verarbeitung einer einzelnen Datei (Lesen eines Arrays von Bytes von der Festplatte in den Speicher, Transformieren der heruntergeladenen Daten und Schreiben in die Datenbank) gleichzeitig eine Sekunde dauert, wird klar, dass die Verarbeitung aller Dateien mehr als 40 Stunden dauert. In diesem Fall können wir diese Dateien nicht bis zum Ende verarbeiten, da die Geschwindigkeit der Erhöhung der Anzahl der Dateien deutlich höher ist als die Geschwindigkeit ihrer Verarbeitung.

Eine Lösung für dieses Problem besteht darin, eine Anwendung zu entwickeln, in der ein Pool von Threads unabhängig voneinander erstellt werden kann. Die Threads verarbeiten Dateien, die aus der allgemeinen Warteschlange ausgewählt wurden. In diesem Fall treten jedoch Schwierigkeiten bei der Synchronisierung von Arbeitsabläufen und der gemeinsamen Nutzung von Ressourcen auf, da gegenseitige Sperren sehr wahrscheinlich sind.

Um diese Schwierigkeiten zu vermeiden, hat Microsoft die TPL-Bibliothek zum .NET-Framework hinzugefügt (ab Version 4.0). Ich werde Ihnen sagen, wie Sie diese Bibliothek verwenden können, um dieses Problem zu lösen.

Daher sieht der Operationsalgorithmus zunächst wie folgt aus: Ein

Dateispeicherverzeichnis wird gescannt und eine Liste (z. B. Liste) mit Daten zu allen Dateien wird zurückgegeben.
Es beginnt ein Zyklus (für oder für jeden), in dem Daten aus der nächsten Datei bei Bedarf in den Speicher eingelesen, transformiert und in die Datenbank geschrieben werden.

Offensichtlich sind die zeitaufwändigsten Vorgänge das Lesen von Daten von der Festplatte in den Speicher und das Schreiben von Daten aus dem Speicher in die Datenbank.

Versuchen wir, unseren Algorithmus mithilfe der TPL-Bibliothek zu optimieren:

Schritt 1.

Ändern Sie die zurückgegebene Liste, indem Sie das Dateispeicherverzeichnis von List in ConcurrentQueue scannen.
Warum machen wir das? Tatsache ist, dass die ConcurrentQueue-Klasse threadsicher ist. Wenn also zwei Threads gleichzeitig versuchen, Daten aus dieser Liste zu extrahieren oder Daten in diese Liste zu schreiben, werden keine Ausnahmen ausgelöst (Ausnahme).
Punkt 1 unseres Algorithmus sieht folgendermaßen aus: Das Dateispeicherverzeichnis wird gescannt und die ConcurrentQueue-Liste mit Daten zu allen Dateien wird zurückgegeben.

Punkt 2:
Lassen Sie uns das Design ändern, das einen Zyklus der Datenverarbeitung aus einer Datei bildet. Ersetzen Sie durch Parallel.For oder Parallel.ForEach.

Was ist der Unterschied zwischen dem Neubau und für? Aus dem Namen des Sprachkonstrukts ist alles einfach und grundsätzlich klar. Alle Iterationen der Schleife werden in parallelen Threads ausgeführt. Als Beispiel werde ich die Organisation der Schleife mit dem Parallel.ForEach-Konstrukt zeigen:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

Dabei gilt

Folgendes : listFiles ist eine Sammlung vom Typ ConcurrentQueue, die eine Liste der Dateien im Verzeichnis enthält.
currentFile - ein Element der listFiles-Auflistung, das vom ForEach-Konstrukt zurückgegeben wird.
dataFile - eine bedingte Datenstruktur im Speicher, die durch Einlesen des Inhalts der Datei in den Speicher erhalten wird;
getDataFile - eine bedingte Funktion, die den Inhalt einer Datei in Form einer Datenstruktur zurückgibt;
TransformData - bedingte Prozedur zum Transformieren empfangener Daten;
WriteToDB ist eine bedingte Prozedur zum Schreiben von Daten in die Datenbank.

In diesem Beispiel organisieren wir die Schleife mithilfe des Konstrukts Parallel.ForEach. In diesem Zyklus werden in parallelen Streams Daten von der Festplatte gelesen, transformiert und in die Datenbank geschrieben. Gleichzeitig gibt es keine Probleme bei der Organisation der Arbeit paralleler Flüsse. Die Anzahl der parallelen Threads hängt von der Anzahl der Prozessorkerne und ihrer Arbeitslast ab.

Mit dem vorgeschlagenen Algorithmus beschleunigen wir die Dateiverarbeitung um mindestens das Zweifache. Diese Zahl hängt natürlich von der Anzahl der Kerne und dem Speicher des Computers ab, auf dem das Programm ausgeführt wird.

Um das Programm zu beschleunigen, müssen Sie den Datensatz in der Datenbank in einem separaten Stream ablegen, der unabhängig vom Haupt-Stream funktioniert. Dies kann mithilfe der ConcurrentQueue-Auflistung erfolgen, um Konflikte beim Hinzufügen von Daten zur Warteschlange zu vermeiden.

Wir schreiben das obige Beispiel neu und berücksichtigen dabei die Optimierung des Schreibens in die Datenbank.
Angenommen, ein Dateireader gibt Daten in einer Datentabelle an uns zurück:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

Wie Sie sehen können, fügen wir anstelle einer Zeile mit einem Aufruf der Schreibprozedur in der Datenbank einfach die ConcurrentQueue ListData-Auflistung hinzu, die in einem separaten Thread beschrieben und initialisiert wurde, dessen Instanz threadWriteToDB in unserer Schleife verwendet wird.

Das Schreiben in die Datenbank erfolgt bereits in einem separaten Stream. Das Schreiben in die Datenbank kann ähnlich wie das Arbeiten mit Dateien mit den Konstruktionen Parallel.For und / oder Paral-lel.Foreach organisiert werden.

In meiner Aufgabe, in der eine vergleichbare Anzahl von Dateien verarbeitet werden musste, kann jetzt durchschnittlich 200.000 bis 400.000 Dateien pro Tag verarbeitet werden, und die Geschwindigkeit wird durch Laden der Datenbank und der Breite des Datenkanals begrenzt.

All Articles