Toda a saudação. Na próxima semana, as aulas começarão no novo grupo do curso Data Engineer . Em conexão com isso, compartilharemos outra tradução interessante com você.
Ao longo do ano passado, trabalhei com a comunidade Apache Parquet para criar o parquet-cpp , uma implementação de leitura / gravação de arquivos C ++ Parquet de primeira classe, adequada para uso em Python e outros aplicativos de dados. Uwe Korn e eu desenvolvemos a interface Python e a integração com os pandas como parte da base de código Python ( pyarrow
) no Apache Arrow.
Este artigo é uma continuação do meu plano estratégico para 2017 .Design: dados da coluna de alto desempenho em Python.
Bibliotecas C ++ Apache Arrow e Parquet são tecnologias auxiliares que foram originalmente projetadas por nós para colaboração coordenada.- C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
- C++ Parquet Parquet.
libparquet_arrow
— , Arrow / Parquet. - PyArrow Python pandas.DataFrame.
Um dos principais objetivos do Apache Arrow é criar um nível interoperacional eficiente de transporte de memória de coluna.Você pode ler sobre a API do usuário do Parquet na base de código PyArrow . As bibliotecas estão disponíveis no conda-forge em:conda install pyarrow arrow-cpp parquet-cpp -c conda-forge
Benchmarks: PyArrow e fastparquet
Para ter uma idéia do desempenho do PyArrow, eu gerei um conjunto de dados de 512 megabytes com dados numéricos que demonstram os vários usos do Parquet. Gerei duas opções para conjuntos de dados:- Alta entropia : todos os valores de dados em um arquivo (exceto os valores zero) são diferentes. Este conjunto de dados pesa 469 MB.
- : . : 23 Snappy. , . , -, .
Criei esses arquivos nos três principais estilos de compactação usados: descompactado, snappy e gzip. Depois, calculo o tempo físico necessário para obter o DataFrame do pandas do disco.O fastparquet é uma implementação mais recente do leitor / gravador de arquivos Parquet para usuários do Python, criado para uso no projeto Dask. Ele é implementado no Python e usa o compilador Numba Python para LLVM para acelerar os procedimentos de decodificação do Parquet. Eu também o instalei para comparar com implementações alternativas.O código para ler o arquivo como pandas.DataFrame é semelhante:
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()
Barras verdes correspondem ao tempo do PyArrow: barras mais longas indicam maior desempenho / maior taxa de transferência de dados. Hardware - Laptop Xeon E3-1505.Atualizei esses benchmarks em 1º de fevereiro de 2017, de acordo com as bases de código mais recentes.
Estado de Desenvolvimento
Precisamos de ajuda com a montagem e embalagem dos parafusos. Além disso, manter os pacotes do conda-forge atualizados leva muito tempo. E, é claro, estamos procurando desenvolvedores em C ++ e Python, para contribuições à base de código em geral.Até agora, prestamos atenção especial à implementação de qualidade do formato de arquivo com alto desempenho de leitura e gravação de conjuntos de dados simples. Estamos começando a processar dados aninhados do tipo JSON no parquet-cpp, usando Arrow como um contêiner para dados de colunas aninhados. Uwe Korn implementourecentemente o suporte ao Arrow para conversões de pandas:List
In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])
In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
[1,
2,
3],
NA,
[1,
2],
[],
[4]
]
In [11]: arr.type
Out[11]: DataType(list<item: int64>)
In [12]: t = pa.Table.from_arrays([arr], ['col'])
In [13]: t.to_pandas()
Out[13]:
col
0 [1, 2, 3]
1 None
2 [1, 2]
3 []
4 [4]
Código de referência
import os
import time
import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp
def generate_floats(n, pct_null, repeats=1):
nunique = int(n / repeats)
unique_values = np.random.randn(nunique)
num_nulls = int(nunique * pct_null)
null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
unique_values[null_indices] = np.nan
return unique_values.repeat(repeats)
DATA_GENERATORS = {
'float64': generate_floats
}
def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
type_ = np.dtype('float64')
nrows = total_size / ncols / np.dtype(type_).itemsize
datagen_func = DATA_GENERATORS[dtype]
data = {
'c' + str(i): datagen_func(nrows, pct_null, repeats)
for i in range(ncols)
}
return pd.DataFrame(data)
def write_to_parquet(df, out_path, compression='SNAPPY'):
arrow_table = pa.Table.from_pandas(df)
if compression == 'UNCOMPRESSED':
compression = None
pq.write_table(arrow_table, out_path, use_dictionary=False,
compression=compression)
def read_fastparquet(path):
return fp.ParquetFile(path).to_pandas()
def read_pyarrow(path, nthreads=1):
return pq.read_table(path, nthreads=nthreads).to_pandas()
MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16
cases = {
'high_entropy': {
'pct_null': 0.1,
'repeats': 1
},
'low_entropy': {
'pct_null': 0.1,
'repeats': 1000
}
}
def get_timing(f, path, niter):
start = time.clock_gettime(time.CLOCK_MONOTONIC)
for i in range(niter):
f(path)
elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
return elapsed
NITER = 5
results = []
readers = [
('fastparquet', lambda path: read_fastparquet(path)),
('pyarrow', lambda path: read_pyarrow(path)),
]
case_files = {}
for case, params in cases.items():
for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
path = '{0}_{1}.parquet'.format(case, compression)
df = generate_data(DATA_SIZE, NCOLS, **params)
write_to_parquet(df, path, compression=compression)
df = None
case_files[case, compression] = path
for case, params in cases.items():
for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
path = case_files[case, compression]
read_pyarrow(path)
read_pyarrow(path)
for reader_name, f in readers:
elapsed = get_timing(f, path, NITER) / NITER
result = case, compression, reader_name, elapsed
print(result)
results.append(result)
Entre no curso.