Using parallelization when processing data in C #



Good day to all! I am a technical specialist working in an internal audit system, my responsibilities include creating ETL tools in the C # programming language.

Periodically, data sources are tightly structured files in xml, csv, json, or any other format. Sometimes their number becomes quite large and constantly increasing. For example, in one of my tasks, the number of files increased with an average refresh rate of approximately 150,000 files per day. If at the same time processing a single file (reading an array of bytes from the hard disk into memory, transforming the downloaded data and writing them to the database) takes a second, then it becomes clear that processing all the files will take more than 40 hours. In this case, we will not be able to process these files to the end, since the speed of increasing the number of files will be clearly higher than the speed of their processing.

One solution to this problem is to develop an application in which to create a pool of threads independent of each other. The threads will process files selected from the general queue. However, in this case, difficulties arise with the synchronization of working flows and resource sharing, since mutual locks are very likely.

To avoid these difficulties, Microsoft added the TPL library to the .Net framework (starting with version 4.0). I will tell you how to use this library to solve this problem.

So, initially the operation algorithm looks as follows: A

file storage directory is scanned and a list (for example, List) containing data about all files is returned;
A cycle starts (for or foreach) in which data from the next file is read into memory, if necessary, transformed and written to the database.

Obviously, the most time-consuming operations are reading data from the hard disk into memory and writing data from memory to the database.

Let's try to optimize our algorithm using the TPL library:

Step 1.

Change the list returned by scanning the file storage directory from List to ConcurrentQueue.
Why are we doing this? The fact is that the ConcurrentQueue class is thread safe, that is, if at the same time two threads try to extract data from this list or write data to it, then we will not throw exceptions (Exception).
Point 1 of our algorithm will look like this: the file storage directory is scanned and the ConcurrentQueue list is returned containing data about all the files.

Point 2:
Let's change the design forming a cycle of processing data from a file. Replace for with Parallel.For or Parallel.ForEach.

What is the difference between the new construction and for? Everything is simple and basically clear from the name of the language construct. All iterations of the loop are performed in parallel threads. As an example, I will show the organization of the loop with the Parallel.ForEach construct:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

where:

listFiles is a collection of type ConcurrentQueue containing a list of files in the directory;
currentFile - an element of the listFiles collection, which is returned by the ForEach construct;
dataFile - a conditional some data structure in memory, obtained by reading the contents of the file into memory;
getDataFile - a conditional function that returns the contents of a file in the form of some data structure;
TransformData - conditional procedure for transforming received data;
WriteToDB is a conditional procedure for writing data to the database.

In this example, using the Parallel.ForEach construct, we will organize the loop. In this cycle, in parallel streams, data is read from the hard disk, their transformation and writing to the database. At the same time, there are no problems in organizing the work of parallel flows. The number of parallel threads depends on the number of processor cores and their workload.

Using the proposed algorithm, we will speed up file processing by at least 2 times. Although, of course, this figure will vary depending on the number of cores and the memory of the machine on which the program will run.

Also, to speed up the program, you need to put the record in the database in a separate stream that works regardless of the main one. This can be done using the ConcurrentQueue collection to avoid conflicts when adding data to the queue.

We rewrite the above example, taking into account the optimization of writing to the database.
Suppose a file reader returns data to us in a DataTable):

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

As you can see, instead of a line with a call to the write procedure in the database, we simply add to the ConcurrentQueue ListData collection described and initialized in a separate thread, an instance of which threadWriteToDB is used in our loop.

Writing to the database is already in a separate stream. Writing to the database can be organized similarly to working with files using the Parallel.For and / or Paral-lel.Foreach constructions.

In my task, where processing of a comparable number of files was required, now, on average, from 200,000 to 400,000 files can be processed per day, and the speed is limited by loading the database and the width of the data channel.

All Articles