Usando paralelização ao processar dados em C #



Bom Dia a todos! Sou especialista técnico trabalhando em um sistema de auditoria interna, minhas responsabilidades incluem a criação de ferramentas ETL na linguagem de programação C #.

Periodicamente, as fontes de dados são arquivos fortemente estruturados em xml, csv, json ou qualquer outro formato. Às vezes, seu número se torna bastante grande e aumenta constantemente. Por exemplo, em uma das minhas tarefas, o número de arquivos aumentou com uma taxa média de atualização de aproximadamente 150.000 arquivos por dia. Se, ao mesmo tempo, processar um único arquivo (ler uma matriz de bytes do disco rígido em memória, transformar os dados baixados e gravá-los no banco de dados) demorar um segundo, fica claro que o processamento de todos os arquivos levará mais de 40 horas. Nesse caso, não poderemos processar esses arquivos até o final, pois a velocidade de aumentar o número de arquivos será claramente maior que a velocidade de seu processamento.

Uma solução para esse problema é desenvolver um aplicativo no qual criar um pool de threads independentes um do outro. Os encadeamentos processarão os arquivos selecionados na fila geral. No entanto, nesse caso, surgem dificuldades com a sincronização dos fluxos de trabalho e o compartilhamento de recursos, pois é muito provável que bloqueios mútuos.

Para evitar essas dificuldades, a Microsoft adicionou a biblioteca TPL à estrutura .Net (iniciando na versão 4.0). Vou lhe dizer como usar esta biblioteca para resolver esse problema.

Portanto, inicialmente o algoritmo de operação é o seguinte: Um

diretório de armazenamento de arquivos é verificado e uma lista (por exemplo, Lista) contendo dados sobre todos os arquivos é retornada;
Um ciclo é iniciado (para ou foreach) no qual os dados do próximo arquivo são lidos na memória, se necessário, transformados e gravados no banco de dados.

Obviamente, as operações mais demoradas são a leitura de dados do disco rígido na memória e a gravação de dados da memória no banco de dados.

Vamos tentar otimizar nosso algoritmo usando a biblioteca TPL:

Etapa 1.

Altere a lista retornada varrendo o diretório de armazenamento de arquivos de List para ConcurrentQueue.
Por que estamos fazendo isso? O fato é que a classe ConcurrentQueue é segura para threads, ou seja, se ao mesmo tempo duas threads tentarem extrair dados dessa lista ou gravar dados nela, não lançaremos exceções (Exception).
O ponto 1 do nosso algoritmo terá a seguinte aparência: o diretório de armazenamento de arquivos é verificado e a lista ConcurrentQueue é retornada, contendo dados sobre todos os arquivos.

Ponto 2:
Vamos mudar o design formando um ciclo de processamento de dados de um arquivo. Substitua por Parallel.For ou Parallel.ForEach.

Qual é a diferença entre a nova construção e para? Tudo é simples e basicamente claro a partir do nome da construção da linguagem. Todas as iterações do loop são executadas em threads paralelos. Como exemplo, mostrarei a organização do loop com a construção Parallel.ForEach:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

onde:

listFiles é uma coleção do tipo ConcurrentQueue que contém uma lista de arquivos no diretório;
currentFile - um elemento da coleção listFiles, retornada pela construção ForEach;
dataFile - uma estrutura de dados condicional na memória, obtida pela leitura do conteúdo do arquivo na memória;
getDataFile - uma função condicional que retorna o conteúdo de um arquivo na forma de alguma estrutura de dados;
TransformData - procedimento condicional para transformar dados recebidos;
WriteToDB é um procedimento condicional para gravar dados no banco de dados.

Neste exemplo, usando a construção Parallel.ForEach, organizaremos o loop. Nesse ciclo, em fluxos paralelos, os dados são lidos no disco rígido, sua transformação e gravação no banco de dados. Ao mesmo tempo, não há problemas em organizar o trabalho de fluxos paralelos. O número de encadeamentos paralelos depende do número de núcleos do processador e de sua carga de trabalho.

Usando o algoritmo proposto, aceleraremos o processamento de arquivos em pelo menos 2 vezes. Embora, é claro, esse número varie dependendo do número de núcleos e da memória da máquina na qual o programa será executado.

Além disso, para acelerar o programa, você precisa colocar o registro no banco de dados em um fluxo separado que funcione independentemente do principal. Isso pode ser feito usando a coleção ConcurrentQueue para evitar conflitos ao adicionar dados à fila.

Reescrevemos o exemplo acima, levando em consideração a otimização da gravação no banco de dados.
Suponha que um leitor de arquivos retorne dados para nós em uma DataTable):

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

Como você pode ver, em vez de uma linha com uma chamada para o procedimento de gravação no banco de dados, simplesmente adicionamos à coleção ConcurrentQueue ListData descrita e inicializada em um thread separado, cuja instância threadWriteToDB é usada em nosso loop.

A gravação no banco de dados já está em um fluxo separado. A gravação no banco de dados pode ser organizada de maneira semelhante ao trabalho com arquivos usando as construções Parallel.For e / ou Paral-lel.Foreach.

Na minha tarefa, onde era necessário processar um número comparável de arquivos, agora ele pode processar em média de 200.000 a 400.000 arquivos por dia, e a velocidade é limitada ao carregar o banco de dados e a largura do canal de dados.

All Articles