Streaming de dados da coluna usando o Apache Arrow

Uma tradução do artigo foi preparada especificamente para os alunos do curso de Engenharia de Dados .




Nas últimas semanas, Nong Li e eu adicionamos um formato de streaming binário ao Apache Arrow , complementando o formato de arquivo de acesso aleatório / IPC existente. Temos implementações nas ligações Java e C ++ e Python. Neste artigo, mostrarei como o formato funciona e mostrarei como você pode obter uma taxa de transferência de dados muito alta para os pandas do DataFrame.

Streaming de dados da coluna


Uma pergunta comum que recebo dos usuários do Arrow é a questão do alto custo de mover grandes conjuntos de dados tabulares de um formato orientado a linhas ou orientado a linhas para um formato de coluna. Para conjuntos de dados de vários gigabytes, a transposição na memória ou no disco pode ser uma tarefa esmagadora.

Para dados de streaming, independentemente de os dados de origem serem seqüência de caracteres ou coluna, uma opção é enviar pequenos pacotes de linhas, cada um dos quais contém um layout de coluna.

No Apache Arrow, uma coleção de matrizes de colunas na memória representando um pedaço de tabela é chamada de lote de registro. Para representar uma única estrutura de dados de uma tabela lógica, vários pacotes de registros podem ser montados.

No formato de arquivo "acesso aleatório" existente, registramos metadados contendo o layout da tabela e a localização dos blocos no final do arquivo, o que permite selecionar extremamente barato qualquer pacote de registros ou coluna do conjunto de dados. Em um formato de streaming, enviamos uma série de mensagens: um esquema e, em seguida, um ou mais pacotes de registros.

Os diferentes formatos se parecem com o mostrado nesta figura:



PyArrow Streaming: Aplicativo


Para mostrar como isso funciona, vou criar um exemplo de conjunto de dados representando um único pedaço de fluxo:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Agora, suponha que desejemos gravar 1 GB de dados consistindo em pedaços de 1 MB cada, para um total de 1024 pedaços. Primeiro, vamos criar o primeiro quadro de dados de 1 MB com 16 colunas:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Então eu os converto para pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Agora vou criar um fluxo de saída que gravará na RAM e criará StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Em seguida, escrevemos 1024 chunks, que formarão um conjunto de dados de 1 GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Como escrevemos na RAM, podemos obter o fluxo inteiro em um buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Como esses dados estão na memória, a leitura dos pacotes de registros Arrow é uma operação de cópia zero. Abro o StreamReader, leio os dados pyarrow.Tablee os converto para DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tudo isso, é claro, é bom, mas você pode ter dúvidas. Quão rápido isso está acontecendo? Como o tamanho do pedaço afeta o desempenho dos pandas do DataFrame?

Desempenho de Streaming


À medida que o tamanho do pedaço de streaming diminui, o custo de reconstruir uma coluna DataFrame contínua em pandas aumenta devido a esquemas de acesso a cache ineficientes. Também há alguma sobrecarga ao trabalhar com estruturas e matrizes de dados C ++ e seus buffers de memória.

Por 1 MB, como mencionado acima, no meu laptop (Xeon E3-1505M de quatro núcleos), verifica-se:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Acontece que a largura de banda efetiva é de 7,75 Gb / s para restaurar um DataFrame de 1 Gb de 1024 blocos de 1 MB cada. O que acontece se usarmos pedaços maiores ou menores? Aqui estão os resultados: O



desempenho cai significativamente de pedaços de 256K para 64K. Fiquei surpreso que pedaços de 1 MB processaram mais rápido que 16 MB. Vale a pena realizar um estudo mais aprofundado e entender se essa é uma distribuição normal ou se alguma outra coisa a afeta.

Na implementação atual do formato, os dados não são compactados em princípio; portanto, o tamanho da memória e dos fios é aproximadamente o mesmo. No futuro, a compactação pode se tornar uma opção adicional.

Total


O fluxo de dados da coluna pode ser uma maneira eficaz de transferir grandes conjuntos de dados para ferramentas de análise de colunas, como pandas, usando pequenos pedaços. Os serviços de dados que usam armazenamento orientado a linha podem transferir e transpor pequenos blocos de dados mais convenientes para o cache L2 e L3 do seu processador.

Código completo

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

All Articles