Salut à tous. La semaine prochaine, les cours commenceront dans le nouveau groupe du cours Data Engineer , en relation avec cela, nous partagerons une autre traduction intéressante avec vous.
Au cours de l'année écoulée, j'ai travaillé avec la communauté Apache Parquet pour créer parquet-cpp , une implémentation de lecture / écriture de fichiers C ++ Parquet de première classe pouvant être utilisée dans Python et d'autres applications de données. Uwe Korn et moi avons développé l'interface Python et l'intégration avec pandas dans le cadre de la base de code Python ( pyarrow
) dans Apache Arrow.
Cet article est la continuation de mon plan stratégique pour 2017 .Conception: données de colonne hautes performances en Python.
Bibliothèques C ++ Apache Arrow et Parquet sont des technologies auxiliaires conçues à l'origine par nous pour une collaboration coordonnée.- C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
- C++ Parquet Parquet.
libparquet_arrow
— , Arrow / Parquet. - PyArrow Python pandas.DataFrame.
L'un des principaux objectifs d'Apache Arrow est de créer un niveau inter-opérationnel efficace de transport de mémoire de colonne.Vous pouvez en savoir plus sur l'API utilisateur Parquet dans la base de code PyArrow . Les bibliothèques sont disponibles chez conda-forge à :conda install pyarrow arrow-cpp parquet-cpp -c conda-forge
Repères: PyArrow et fastparquet
Pour avoir une idée des performances de PyArrow, j'ai généré un ensemble de données de 512 mégaoctets avec des données numériques qui démontrent diverses utilisations de Parquet. J'ai généré deux options pour les jeux de données:- Entropie élevée : toutes les valeurs de données d'un fichier (à l'exception des valeurs nulles) sont différentes. Cet ensemble de données pèse 469 Mo.
- : . : 23 Snappy. , . , -, .
J'ai créé ces fichiers dans trois principaux styles de compression utilisés: non compressé, accrocheur et gzip. Ensuite, je calcule le temps physique nécessaire pour obtenir les pandas DataFrame à partir du disque.fastparquet est une implémentation plus récente du lecteur / enregistreur de fichiers Parquet pour les utilisateurs Python, créée pour être utilisée dans le projet Dask. Il est implémenté en Python et utilise le compilateur Numba Python-to-LLVM pour accélérer les procédures de décodage de Parquet. Je l'ai également installé pour comparer avec d'autres implémentations.Le code de lecture du fichier en tant que pandas.DataFrame est similaire:
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()
Les barres vertes correspondent au temps PyArrow: des barres plus longues indiquent des performances plus élevées / un débit de données plus élevé. Matériel - Ordinateur portable Xeon E3-1505.J'ai mis à jour ces benchmarks le 1er février 2017 conformément aux dernières bases de code.
Statut de développement
Nous avons besoin d'aide pour l'assemblage et l'emballage des vis. De plus, la mise à jour des packages conda-forge prend beaucoup de temps. Et bien sûr, nous recherchons des développeurs en C ++ et Python, pour des contributions à la base de code en général.Jusqu'à présent, nous avons accordé une attention particulière à la mise en œuvre de qualité du format de fichier avec des performances élevées en lecture et en écriture de jeux de données simples. Nous commençons à traiter les données imbriquées de type JSON dans parquet-cpp, en utilisant Arrow comme conteneur pour les données de colonnes imbriquées. Uwe Korn arécemment implémenté le support Arrow pour les conversions de pandas:List
In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])
In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
[1,
2,
3],
NA,
[1,
2],
[],
[4]
]
In [11]: arr.type
Out[11]: DataType(list<item: int64>)
In [12]: t = pa.Table.from_arrays([arr], ['col'])
In [13]: t.to_pandas()
Out[13]:
col
0 [1, 2, 3]
1 None
2 [1, 2]
3 []
4 [4]
Code de référence
import os
import time
import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp
def generate_floats(n, pct_null, repeats=1):
nunique = int(n / repeats)
unique_values = np.random.randn(nunique)
num_nulls = int(nunique * pct_null)
null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
unique_values[null_indices] = np.nan
return unique_values.repeat(repeats)
DATA_GENERATORS = {
'float64': generate_floats
}
def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
type_ = np.dtype('float64')
nrows = total_size / ncols / np.dtype(type_).itemsize
datagen_func = DATA_GENERATORS[dtype]
data = {
'c' + str(i): datagen_func(nrows, pct_null, repeats)
for i in range(ncols)
}
return pd.DataFrame(data)
def write_to_parquet(df, out_path, compression='SNAPPY'):
arrow_table = pa.Table.from_pandas(df)
if compression == 'UNCOMPRESSED':
compression = None
pq.write_table(arrow_table, out_path, use_dictionary=False,
compression=compression)
def read_fastparquet(path):
return fp.ParquetFile(path).to_pandas()
def read_pyarrow(path, nthreads=1):
return pq.read_table(path, nthreads=nthreads).to_pandas()
MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16
cases = {
'high_entropy': {
'pct_null': 0.1,
'repeats': 1
},
'low_entropy': {
'pct_null': 0.1,
'repeats': 1000
}
}
def get_timing(f, path, niter):
start = time.clock_gettime(time.CLOCK_MONOTONIC)
for i in range(niter):
f(path)
elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
return elapsed
NITER = 5
results = []
readers = [
('fastparquet', lambda path: read_fastparquet(path)),
('pyarrow', lambda path: read_pyarrow(path)),
]
case_files = {}
for case, params in cases.items():
for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
path = '{0}_{1}.parquet'.format(case, compression)
df = generate_data(DATA_SIZE, NCOLS, **params)
write_to_parquet(df, path, compression=compression)
df = None
case_files[case, compression] = path
for case, params in cases.items():
for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
path = case_files[case, compression]
read_pyarrow(path)
read_pyarrow(path)
for reader_name, f in readers:
elapsed = get_timing(f, path, NITER) / NITER
result = case, compression, reader_name, elapsed
print(result)
results.append(result)
Prenez le cap.