Parquet Apache haute vitesse en Python avec flèche Apache

Salut à tous. La semaine prochaine, les cours commenceront dans le nouveau groupe du cours Data Engineer , en relation avec cela, nous partagerons une autre traduction intéressante avec vous.



Au cours de l'année écoulée, j'ai travaillé avec la communauté Apache Parquet pour créer parquet-cpp , une implémentation de lecture / écriture de fichiers C ++ Parquet de première classe pouvant être utilisée dans Python et d'autres applications de données. Uwe Korn et moi avons développé l'interface Python et l'intégration avec pandas dans le cadre de la base de code Python ( pyarrow) dans Apache Arrow.

Cet article est la continuation de mon plan stratégique pour 2017 .


Conception: données de colonne hautes performances en Python.


Bibliothèques C ++ Apache Arrow et Parquet sont des technologies auxiliaires conçues à l'origine par nous pour une collaboration coordonnée.

  • C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
  • C++ Parquet Parquet. libparquet_arrow — , Arrow / Parquet.
  • PyArrow Python pandas.DataFrame.

L'un des principaux objectifs d'Apache Arrow est de créer un niveau inter-opérationnel efficace de transport de mémoire de colonne.

Vous pouvez en savoir plus sur l'API utilisateur Parquet dans la base de code PyArrow . Les bibliothèques sont disponibles chez conda-forge à:

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

Repères: PyArrow et fastparquet


Pour avoir une idée des performances de PyArrow, j'ai généré un ensemble de données de 512 mégaoctets avec des données numériques qui démontrent diverses utilisations de Parquet. J'ai généré deux options pour les jeux de données:

  • Entropie Ă©levĂ©e : toutes les valeurs de donnĂ©es d'un fichier (Ă  l'exception des valeurs nulles) sont diffĂ©rentes. Cet ensemble de donnĂ©es pèse 469 Mo.
  • : . : 23 Snappy. , . , -, .

J'ai créé ces fichiers dans trois principaux styles de compression utilisés: non compressé, accrocheur et gzip. Ensuite, je calcule le temps physique nécessaire pour obtenir les pandas DataFrame à partir du disque.

fastparquet est une implémentation plus récente du lecteur / enregistreur de fichiers Parquet pour les utilisateurs Python, créée pour être utilisée dans le projet Dask. Il est implémenté en Python et utilise le compilateur Numba Python-to-LLVM pour accélérer les procédures de décodage de Parquet. Je l'ai également installé pour comparer avec d'autres implémentations.

Le code de lecture du fichier en tant que pandas.DataFrame est similaire:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

Les barres vertes correspondent au temps PyArrow: des barres plus longues indiquent des performances plus élevées / un débit de données plus élevé. Matériel - Ordinateur portable Xeon E3-1505.

J'ai mis à jour ces benchmarks le 1er février 2017 conformément aux dernières bases de code.



Statut de développement


Nous avons besoin d'aide pour l'assemblage et l'emballage des vis. De plus, la mise à jour des packages conda-forge prend beaucoup de temps. Et bien sûr, nous recherchons des développeurs en C ++ et Python, pour des contributions à la base de code en général.

Jusqu'à présent, nous avons accordé une attention particulière à la mise en œuvre de qualité du format de fichier avec des performances élevées en lecture et en écriture de jeux de données simples. Nous commençons à traiter les données imbriquées de type JSON dans parquet-cpp, en utilisant Arrow comme conteneur pour les données de colonnes imbriquées. Uwe Korn a

récemment implémenté le support Arrow pour les conversions de pandas:List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

Code de référence


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



Prenez le cap.



All Articles