Transmisión de datos de columna con Apache Arrow

Se preparó una traducción del artículo específicamente para estudiantes del curso de Ingeniero de datos .




En las últimas semanas, Nong Li y yo hemos agregado un formato de transmisión binaria a Apache Arrow , complementando el formato de archivo de acceso aleatorio / IPC existente. Tenemos implementaciones en Java y C ++ y enlaces de Python. En este artículo, le diré cómo funciona el formato y le mostraré cómo puede lograr un rendimiento de datos muy alto para los pandas DataFrame.

Transmisión de datos de columna


Una pregunta común que recibo de los usuarios de Arrow es la cuestión del alto costo de mover grandes conjuntos de datos tabulares de un formato orientado a filas u orientado a filas a un formato de columna. Para conjuntos de datos de varios gigabytes, la transposición en memoria o en disco puede ser una tarea abrumadora.

Para la transmisión de datos, independientemente de si los datos de origen son cadenas o columnas, una opción es enviar pequeños paquetes de filas, cada una de las cuales contiene un diseño de columna.

En Apache Arrow, una colección de matrices de columnas en memoria que representan un fragmento de tabla se denomina lote de registros. Para representar una estructura de datos única de una tabla lógica, puede recopilar varios paquetes de registros.

En el formato de archivo de "acceso aleatorio" existente, registramos metadatos que contienen el diseño de la tabla y la ubicación de los bloques al final del archivo, lo que le permite seleccionar de forma extremadamente económica cualquier paquete de registros o cualquier columna del conjunto de datos. En un formato de transmisión, enviamos una serie de mensajes: un esquema y luego uno o más registros de paquetes.

Los diferentes formatos se parecen a los que se muestran en esta figura:



PyArrow Streaming: Aplicación


Para mostrarle cómo funciona esto, crearé un conjunto de datos de ejemplo que representa un fragmento de flujo único:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Ahora, supongamos que queremos grabar 1 GB de datos, que consta de fragmentos de 1 MB cada uno, para un total de 1024 fragmentos. Primero, creemos el primer marco de datos de 1 MB con 16 columnas:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Luego los convierto a pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Ahora crearé una secuencia de salida que escribirá en la RAM y creará StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Luego escribimos 1024 fragmentos, que eventualmente formarán un conjunto de datos de 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Como escribimos en RAM, podemos obtener toda la transmisión en un búfer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Dado que estos datos están en la memoria, leer los paquetes de registro de Arrow es una operación de copia cero. Abro StreamReader, leo los datos pyarrow.Tabley luego los convierto a DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Todo esto, por supuesto, es bueno, pero puede tener preguntas. ¿Qué tan rápido está pasando esto? ¿Cómo afecta el tamaño del fragmento al rendimiento de obtener pandas DataFrame?

Rendimiento de transmisión


A medida que disminuye el tamaño del fragmento de transmisión, el costo de reconstruir una columna continua de DataFrame en pandas aumenta debido a esquemas ineficientes de acceso a caché. También hay algo de sobrecarga por trabajar con estructuras y matrices de datos C ++ y sus memorias intermedias.

Para 1 MB, como se mencionó anteriormente, en mi computadora portátil (Quad-core Xeon E3-1505M) resulta:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Resulta que el ancho de banda efectivo es de 7.75 Gb / s para recuperar un DataFrame de 1Gb de 1024 fragmentos de 1MB cada uno. ¿Qué sucede si usamos trozos más grandes o más pequeños? Estos son los resultados: el



rendimiento cae significativamente de 256K a 64K fragmentos. Me sorprendió que los fragmentos de 1 MB se procesaran más rápido que los 16 MB. Vale la pena realizar un estudio más exhaustivo y comprender si esta es una distribución normal o si algo más la afecta.

En la implementación actual del formato, los datos no están comprimidos en principio, por lo tanto, el tamaño en la memoria y en los cables es aproximadamente el mismo. En el futuro, la compresión puede convertirse en una opción adicional.

Total


La transmisión de datos de columnas puede ser una forma efectiva de transferir grandes conjuntos de datos a herramientas de análisis de columnas, como pandas, utilizando pequeños fragmentos. Los servicios de datos que utilizan almacenamiento orientado a líneas pueden transferir y transponer pequeños fragmentos de datos que son más convenientes para la caché L2 y L3 de su procesador.

Código completo

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

All Articles