Todos saludan. La próxima semana, las clases comenzarán en el nuevo grupo del curso de Ingeniero de Datos , en relación con esto compartiremos otra traducción interesante con usted.
Durante el año pasado, he estado trabajando con la comunidad Apache Parquet para crear parquet-cpp , una implementación de primera clase de C ++ Parquet para leer / escribir archivos, adecuada para usar en Python y otras aplicaciones de datos. Uwe Korn y yo desarrollamos la interfaz Python y la integración con pandas como parte de la base de código Python ( pyarrow
) en Apache Arrow.
Este artículo es una continuación de mi plan estratégico para 2017 .Diseño: datos de columna de alto rendimiento en Python.
Las bibliotecas C ++ Apache Arrow y Parquet son tecnologías auxiliares que fueron diseñadas originalmente por nosotros para una colaboración coordinada.- C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
- C++ Parquet Parquet.
libparquet_arrow
— , Arrow / Parquet. - PyArrow Python pandas.DataFrame.
Uno de los objetivos principales de Apache Arrow es crear un nivel interoperativo eficiente de transporte de memoria de columna.Puede leer sobre la API de usuario de Parquet en la base de código PyArrow . Las bibliotecas están disponibles en conda-forge en:conda install pyarrow arrow-cpp parquet-cpp -c conda-forge
Puntos de referencia: PyArrow y fastparquet
Para tener una idea del rendimiento de PyArrow, generé un conjunto de datos de 512 megabytes con datos numéricos que demuestran varios usos para Parquet. Generé dos opciones para conjuntos de datos:- Entropía alta : todos los valores de datos en un archivo (excepto los valores cero) son diferentes. Este conjunto de datos pesa 469 MB.
- : . : 23 Snappy. , . , -, .
Creé estos archivos en tres estilos de compresión principales utilizados: sin comprimir, ágil y gzip. Luego calculo el tiempo físico que toma obtener el DataFrame de pandas del disco.fastparquet es una implementación más reciente del lector / escritor de archivos Parquet para usuarios de Python, creado para su uso en el proyecto Dask. Se implementa en Python y utiliza el compilador Numba Python-to-LLVM para acelerar los procedimientos de decodificación de Parquet. También lo instalé para compararlo con implementaciones alternativas.El código para leer el archivo como pandas.DataFrame es similar:
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()
Las columnas verdes corresponden al tiempo de PyArrow: las columnas más largas indican un mayor rendimiento / mayor rendimiento de datos. Hardware - Xeon E3-1505 Laptop.Actualicé estos puntos de referencia el 1 de febrero de 2017 de acuerdo con las últimas bases de código.
Estado del desarrollo
Necesitamos ayuda con el ensamblaje y empaque del tornillo. Además, mantener actualizados los paquetes de conda-forge lleva mucho tiempo. Y, por supuesto, estamos buscando desarrolladores en C ++ y Python, para contribuciones a la base de código en general.Hasta ahora, hemos prestado especial atención a la implementación de calidad del formato de archivo con un alto rendimiento de lectura y escritura de conjuntos de datos simples. Estamos comenzando a pasar a procesar datos anidados similares a JSON en parquet-cpp, usando Arrow como contenedor para datos de columnas anidadas. Uwe Korn implementórecientemente el soporte de Arrow para las conversiones de pandas:List
In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])
In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
[1,
2,
3],
NA,
[1,
2],
[],
[4]
]
In [11]: arr.type
Out[11]: DataType(list<item: int64>)
In [12]: t = pa.Table.from_arrays([arr], ['col'])
In [13]: t.to_pandas()
Out[13]:
col
0 [1, 2, 3]
1 None
2 [1, 2]
3 []
4 [4]
Código de referencia
import os
import time
import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp
def generate_floats(n, pct_null, repeats=1):
nunique = int(n / repeats)
unique_values = np.random.randn(nunique)
num_nulls = int(nunique * pct_null)
null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
unique_values[null_indices] = np.nan
return unique_values.repeat(repeats)
DATA_GENERATORS = {
'float64': generate_floats
}
def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
type_ = np.dtype('float64')
nrows = total_size / ncols / np.dtype(type_).itemsize
datagen_func = DATA_GENERATORS[dtype]
data = {
'c' + str(i): datagen_func(nrows, pct_null, repeats)
for i in range(ncols)
}
return pd.DataFrame(data)
def write_to_parquet(df, out_path, compression='SNAPPY'):
arrow_table = pa.Table.from_pandas(df)
if compression == 'UNCOMPRESSED':
compression = None
pq.write_table(arrow_table, out_path, use_dictionary=False,
compression=compression)
def read_fastparquet(path):
return fp.ParquetFile(path).to_pandas()
def read_pyarrow(path, nthreads=1):
return pq.read_table(path, nthreads=nthreads).to_pandas()
MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16
cases = {
'high_entropy': {
'pct_null': 0.1,
'repeats': 1
},
'low_entropy': {
'pct_null': 0.1,
'repeats': 1000
}
}
def get_timing(f, path, niter):
start = time.clock_gettime(time.CLOCK_MONOTONIC)
for i in range(niter):
f(path)
elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
return elapsed
NITER = 5
results = []
readers = [
('fastparquet', lambda path: read_fastparquet(path)),
('pyarrow', lambda path: read_pyarrow(path)),
]
case_files = {}
for case, params in cases.items():
for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
path = '{0}_{1}.parquet'.format(case, compression)
df = generate_data(DATA_SIZE, NCOLS, **params)
write_to_parquet(df, path, compression=compression)
df = None
case_files[case, compression] = path
for case, params in cases.items():
for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
path = case_files[case, compression]
read_pyarrow(path)
read_pyarrow(path)
for reader_name, f in readers:
elapsed = get_timing(f, path, NITER) / NITER
result = case, compression, reader_name, elapsed
print(result)
results.append(result)
Súbete al curso.