Parquet Apache de alta velocidade em Python com Apache Arrow

Toda a saudação. Na próxima semana, as aulas começarão no novo grupo do curso Data Engineer . Em conexão com isso, compartilharemos outra tradução interessante com você.



Ao longo do ano passado, trabalhei com a comunidade Apache Parquet para criar o parquet-cpp , uma implementação de leitura / gravação de arquivos C ++ Parquet de primeira classe, adequada para uso em Python e outros aplicativos de dados. Uwe Korn e eu desenvolvemos a interface Python e a integração com os pandas como parte da base de código Python ( pyarrow) no Apache Arrow.

Este artigo é uma continuação do meu plano estratégico para 2017 .


Design: dados da coluna de alto desempenho em Python.


Bibliotecas C ++ Apache Arrow e Parquet são tecnologias auxiliares que foram originalmente projetadas por nós para colaboração coordenada.

  • C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
  • C++ Parquet Parquet. libparquet_arrow — , Arrow / Parquet.
  • PyArrow Python pandas.DataFrame.

Um dos principais objetivos do Apache Arrow é criar um nível interoperacional eficiente de transporte de memória de coluna.

Você pode ler sobre a API do usuário do Parquet na base de código PyArrow . As bibliotecas estão disponíveis no conda-forge em:

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

Benchmarks: PyArrow e fastparquet


Para ter uma idéia do desempenho do PyArrow, eu gerei um conjunto de dados de 512 megabytes com dados numéricos que demonstram os vários usos do Parquet. Gerei duas opções para conjuntos de dados:

  • Alta entropia : todos os valores de dados em um arquivo (exceto os valores zero) são diferentes. Este conjunto de dados pesa 469 MB.
  • : . : 23 Snappy. , . , -, .

Criei esses arquivos nos três principais estilos de compactação usados: descompactado, snappy e gzip. Depois, calculo o tempo físico necessário para obter o DataFrame do pandas do disco.

O fastparquet é uma implementação mais recente do leitor / gravador de arquivos Parquet para usuários do Python, criado para uso no projeto Dask. Ele é implementado no Python e usa o compilador Numba Python para LLVM para acelerar os procedimentos de decodificação do Parquet. Eu também o instalei para comparar com implementações alternativas.

O código para ler o arquivo como pandas.DataFrame é semelhante:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

Barras verdes correspondem ao tempo do PyArrow: barras mais longas indicam maior desempenho / maior taxa de transferência de dados. Hardware - Laptop Xeon E3-1505.

Atualizei esses benchmarks em 1º de fevereiro de 2017, de acordo com as bases de código mais recentes.



Estado de Desenvolvimento


Precisamos de ajuda com a montagem e embalagem dos parafusos. Além disso, manter os pacotes do conda-forge atualizados leva muito tempo. E, é claro, estamos procurando desenvolvedores em C ++ e Python, para contribuições à base de código em geral.

Até agora, prestamos atenção especial à implementação de qualidade do formato de arquivo com alto desempenho de leitura e gravação de conjuntos de dados simples. Estamos começando a processar dados aninhados do tipo JSON no parquet-cpp, usando Arrow como um contêiner para dados de colunas aninhados. Uwe Korn implementou

recentemente o suporte ao Arrow para conversões de pandas:List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

Código de referência


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



Entre no curso.



All Articles