Parquet Apache de alta velocidad en Python con Apache Arrow

Todos saludan. La próxima semana, las clases comenzarán en el nuevo grupo del curso de Ingeniero de Datos , en relación con esto compartiremos otra traducción interesante con usted.



Durante el año pasado, he estado trabajando con la comunidad Apache Parquet para crear parquet-cpp , una implementación de primera clase de C ++ Parquet para leer / escribir archivos, adecuada para usar en Python y otras aplicaciones de datos. Uwe Korn y yo desarrollamos la interfaz Python y la integración con pandas como parte de la base de código Python ( pyarrow) en Apache Arrow.

Este artículo es una continuación de mi plan estratégico para 2017 .


Diseño: datos de columna de alto rendimiento en Python.


Las bibliotecas C ++ Apache Arrow y Parquet son tecnologías auxiliares que fueron diseñadas originalmente por nosotros para una colaboración coordinada.

  • C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
  • C++ Parquet Parquet. libparquet_arrow — , Arrow / Parquet.
  • PyArrow Python pandas.DataFrame.

Uno de los objetivos principales de Apache Arrow es crear un nivel interoperativo eficiente de transporte de memoria de columna.

Puede leer sobre la API de usuario de Parquet en la base de código PyArrow . Las bibliotecas están disponibles en conda-forge en:

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

Puntos de referencia: PyArrow y fastparquet


Para tener una idea del rendimiento de PyArrow, generé un conjunto de datos de 512 megabytes con datos numéricos que demuestran varios usos para Parquet. Generé dos opciones para conjuntos de datos:

  • Entropía alta : todos los valores de datos en un archivo (excepto los valores cero) son diferentes. Este conjunto de datos pesa 469 MB.
  • : . : 23 Snappy. , . , -, .

Creé estos archivos en tres estilos de compresión principales utilizados: sin comprimir, ágil y gzip. Luego calculo el tiempo físico que toma obtener el DataFrame de pandas del disco.

fastparquet es una implementación más reciente del lector / escritor de archivos Parquet para usuarios de Python, creado para su uso en el proyecto Dask. Se implementa en Python y utiliza el compilador Numba Python-to-LLVM para acelerar los procedimientos de decodificación de Parquet. También lo instalé para compararlo con implementaciones alternativas.

El código para leer el archivo como pandas.DataFrame es similar:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

Las columnas verdes corresponden al tiempo de PyArrow: las columnas más largas indican un mayor rendimiento / mayor rendimiento de datos. Hardware - Xeon E3-1505 Laptop.

Actualicé estos puntos de referencia el 1 de febrero de 2017 de acuerdo con las últimas bases de código.



Estado del desarrollo


Necesitamos ayuda con el ensamblaje y empaque del tornillo. Además, mantener actualizados los paquetes de conda-forge lleva mucho tiempo. Y, por supuesto, estamos buscando desarrolladores en C ++ y Python, para contribuciones a la base de código en general.

Hasta ahora, hemos prestado especial atención a la implementación de calidad del formato de archivo con un alto rendimiento de lectura y escritura de conjuntos de datos simples. Estamos comenzando a pasar a procesar datos anidados similares a JSON en parquet-cpp, usando Arrow como contenedor para datos de columnas anidadas. Uwe Korn implementó

recientemente el soporte de Arrow para las conversiones de pandas:List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

Código de referencia


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



Súbete al curso.



All Articles