在C#中处理数据时使用并行化



祝大家有美好的一天!我是内部审计系统的技术专家,我的职责包括使用C#编程语言创建ETL工具。

数据源通常是xml,csv,json或任何其他格式的结构紧密的文件。有时,它们的数量变得很大并且不断增加。例如,在我的一项任务中,文件数量增加了,每天平均刷新率约为150,000个文件。如果同时处理一个文件(从硬盘读取字节数组到内存,转换下载的数据并将其写入数据库)需要一秒钟,那么很明显,处理所有文件将花费40多个小时。在这种情况下,我们将无法处理这些文件,因为增加文件数量的速度明显高于处理它们的速度。

解决该问题的一种方法是开发一种应用程序,在其中创建彼此独立的线程池。线程将处理从常规队列中选择的文件。但是,在这种情况下,由于非常可能相互锁定,因此工作流程和资源共享的同步会出现困难。

为避免这些困难,Microsoft将TPL库添加到了.Net框架(从4.0版开始)。我将告诉您如何使用该库来解决此问题。

因此,最初的操作算法如下:

扫描文件存储目录,并返回一个包含所有文件数据的列表(例如List);
一个周期开始(foreach或foreach),在该周期中,将下一个文件中的数据读入内存,如果有必要,将其转换并写入数据库。

显然,最耗时的操作是将数据从硬盘读取到内存,并将数据从内存写入数据库。

让我们尝试使用TPL库优化算法:

步骤1.

通过将文件存储目录从List扫描到ConcurrentQueue来更改返回的列表。
我们为什么要这样做呢?事实是ConcurrentQueue类是线程安全的,也就是说,如果两个线程同时尝试从该列表中提取数据或向该列表中写入数据,则我们将不会抛出异常(Exception)。
我们算法的第1点将如下所示:扫描文件存储目录,并返回ConcurrentQueue列表,其中包含有关所有文件的数据。

第2点:
让我们更改设计,形成一个处理文件数据的循环。替换为Parallel.For或Parallel.ForEach。

新建和for有什么区别?从语言构造的名称来看,一切都很简单,而且基本上是清晰的。循环的所有迭代都在并行线程中执行。作为一个例子,我将展示Parallel.ForEach构造的循环组织:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

其中:

listFiles是ConcurrentQueue类型的集合,其中包含目录中的文件列表;
currentFile-listFiles集合的元素,由ForEach构造返回;
dataFile-内存中的条件数据结构,通过将文件的内容读入内存获得;
getDataFile-一个条件函数,以某种数据结构的形式返回文件的内容;
TransformData-转换接收数据的条件过程;
WriteToDB是用于将数据写入数据库的条件过程。

在此示例中,使用Parallel.ForEach构造,我们将组织循环。在此循环中,在并行流中,从硬盘读取数据,将其转换并写入数据库。同时,组织并行流程的工作没有任何问题。并行线程的数量取决于处理器核心的数量及其工作负载。

使用提出的算法,我们将文件处理速度至少提高了2倍。尽管这个数字当然会根据运行程序的计算机的内核数和内存而变化。

另外,为了加快程序运行速度,您需要将记录放入数据库中的单独流中,而与主记录无关。可以使用ConcurrentQueue集合来完成此操作,以避免在将数据添加到队列时发生冲突。

考虑到写入数据库的优化,我们重写了上面的示例。
假设文件读取器在DataTable中向我们返回了数据:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

如您所见,我们只需在单独的线程中添加描述和初始化的ConcurrentQueue ListData集合(而不是在数据库中调用写过程的行),就可以在循环中使用threadWriteToDB的实例。

写入数据库已经在单独的流中。与使用Parallel.For和/或Paral-lel.Foreach结构处理文件的方式类似,写入数据库的方式也可以类似。

在我的任务中,需要处理相当数量的文件,现在平均每天可以处理200,000到400,000个文件,并且速度受加载数据库和数据通道宽度的限制。

All Articles