Uso de la paralelización al procesar datos en C #



¡Buen día a todos! Soy un especialista técnico que trabaja en un sistema de auditoría interna, mis responsabilidades incluyen la creación de herramientas ETL en el lenguaje de programación C #.

Periódicamente, las fuentes de datos son archivos estrechamente estructurados en xml, csv, json o cualquier otro formato. A veces su número se vuelve bastante grande y aumenta constantemente. Por ejemplo, en una de mis tareas, el número de archivos aumentó con una frecuencia de actualización promedio de aproximadamente 150,000 archivos por día. Si al mismo tiempo el procesamiento de un archivo (leer una matriz de bytes del disco duro en la memoria, transformar los datos descargados y escribirlos en la base de datos) toma un segundo, queda claro que procesar todos los archivos llevará más de 40 horas. En este caso, no podremos procesar estos archivos hasta el final, ya que la velocidad de aumentar el número de archivos será claramente mayor que la velocidad de su procesamiento.

Una solución a este problema es desarrollar una aplicación en la que crear un grupo de subprocesos independientes entre sí. Los hilos procesarán los archivos seleccionados de la cola general. Sin embargo, en este caso, surgen dificultades con la sincronización de los flujos de trabajo y el intercambio de recursos, ya que es muy probable que haya bloqueos mutuos.

Para evitar estas dificultades, Microsoft agregó la biblioteca TPL al marco .Net (a partir de la versión 4.0). Te diré cómo usar esta biblioteca para resolver este problema.

Entonces, inicialmente el algoritmo de operación tiene el siguiente aspecto:

se explora un directorio de almacenamiento de archivos y se devuelve una lista (por ejemplo, Lista) que contiene datos sobre todos los archivos;
Comienza un ciclo (para o para cada uno) en el que los datos del siguiente archivo se leen en la memoria, si es necesario, se transforman y se escriben en la base de datos.

Obviamente, las operaciones que requieren más tiempo son leer datos del disco duro en la memoria y escribir datos de la memoria en la base de datos.

Intentemos optimizar nuestro algoritmo utilizando la biblioteca TPL:

Paso 1.

Cambie la lista devuelta al escanear el directorio de almacenamiento de archivos de List a ConcurrentQueue.
¿Por qué estamos haciendo esto? El hecho es que la clase ConcurrentQueue es segura para subprocesos, es decir, si al mismo tiempo dos subprocesos intentan extraer datos de esta lista o escribir datos en ella, entonces no arrojaremos excepciones (Excepción).
El punto 1 de nuestro algoritmo se verá así: se explora el directorio de almacenamiento de archivos y se devuelve la lista ConcurrentQueue que contiene datos sobre todos los archivos.

Punto 2:
Cambiemos el diseño formando un ciclo de procesamiento de datos desde un archivo. Sustitúyalo por Parallel.For o Parallel.ForEach.

¿Cuál es la diferencia entre la nueva construcción y para? Todo es simple y básicamente claro a partir del nombre de la construcción del lenguaje. Todas las iteraciones del bucle se realizan en hilos paralelos. Como ejemplo, mostraré la organización del bucle con la construcción Parallel.ForEach:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

donde:

listFiles es una colección de tipo ConcurrentQueue que contiene una lista de archivos en el directorio;
currentFile: un elemento de la colección listFiles, que devuelve la construcción ForEach;
dataFile: una estructura condicional de algunos datos en la memoria, obtenida al leer el contenido del archivo en la memoria;
getDataFile: una función condicional que devuelve el contenido de un archivo en forma de alguna estructura de datos;
TransformData: procedimiento condicional para transformar los datos recibidos;
WriteToDB es un procedimiento condicional para escribir datos en la base de datos.

En este ejemplo, usando la construcción Parallel.ForEach, organizaremos el ciclo. En este ciclo, en flujos paralelos, los datos se leen desde el disco duro, su transformación y escritura en la base de datos. Al mismo tiempo, no hay problemas para organizar el trabajo de flujos paralelos. El número de subprocesos paralelos depende del número de núcleos de procesador y su carga de trabajo.

Usando el algoritmo propuesto, aceleraremos el procesamiento de archivos al menos 2 veces. Aunque, por supuesto, esta cifra variará según el número de núcleos y la memoria de la máquina en la que se ejecutará el programa.

Además, para acelerar el programa, debe colocar el registro en la base de datos en una secuencia separada que funcione independientemente de la principal. Esto se puede hacer usando la colección ConcurrentQueue para evitar conflictos al agregar datos a la cola.

Reescribimos el ejemplo anterior, teniendo en cuenta la optimización de la escritura en la base de datos.
Supongamos que un lector de archivos nos devuelve datos en una tabla de datos):

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

Como puede ver, en lugar de una línea con una llamada al procedimiento de escritura en la base de datos, simplemente agregamos a la colección ConcurrentQueue ListData descrita e inicializada en un hilo separado, una instancia de la cual se usa threadWriteToDB en nuestro bucle.

Escribir en la base de datos ya está en una secuencia separada. Escribir en la base de datos puede organizarse de manera similar a trabajar con archivos usando las construcciones Parallel.For y / o Paral-lel.Foreach.

En mi tarea, donde se requería el procesamiento de un número comparable de archivos, ahora, en promedio, se pueden procesar de 200,000 a 400,000 archivos por día, y la velocidad se limita al cargar la base de datos y el ancho del canal de datos.

All Articles