Streaming des données de colonne à l'aide d'Apache Arrow

Une traduction de l'article a été préparée spécialement pour les étudiants du cours Data Engineer .




Au cours des dernières semaines, Nong Li et moi avons ajouté un format de streaming binaire à Apache Arrow , complétant le format de fichier d'accès aléatoire / IPC existant. Nous avons des implémentations dans les liaisons Java et C ++ et Python. Dans cet article, je vais vous expliquer comment fonctionne le format et montrer comment vous pouvez atteindre un débit de données très élevé pour les pandas DataFrame.

Streaming de données de colonne


Une question fréquente des utilisateurs d'Arrow est la question du coût élevé du déplacement de grands ensembles de données tabulaires d'un format orienté ligne ou orienté ligne vers un format colonne. Pour les jeux de données de plusieurs gigaoctets, la transposition en mémoire ou sur disque peut être une tâche écrasante.

Pour la diffusion de données en continu, que les données sources soient des chaînes ou des colonnes, une option consiste à envoyer de petits paquets de lignes, chacune contenant à l'intérieur une disposition de colonnes.

Dans Apache Arrow, une collection de tableaux de colonnes en mémoire représentant un bloc de table est appelée un lot d'enregistrement. Pour représenter une structure de données unique d'une table logique, plusieurs packages d'enregistrement peuvent être assemblés.

Dans le format de fichier «à accès aléatoire» existant, nous enregistrons des métadonnées contenant la disposition du tableau et l'emplacement des blocs à la fin du fichier, ce qui vous permet de sélectionner à moindre coût n'importe quel package d'enregistrement ou n'importe quelle colonne de l'ensemble de données. Dans un format de streaming, nous envoyons une série de messages: un schéma, puis un ou plusieurs enregistrements de paquets.

Les différents formats ressemblent à celui montré dans cette figure:



PyArrow Streaming: Application


Pour vous montrer comment cela fonctionne, je vais créer un exemple de jeu de données représentant un morceau de flux unique:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Supposons maintenant que nous voulions enregistrer 1 Go de données constituées de morceaux de 1 Mo chacun, pour un total de 1024 morceaux. Commençons par créer le premier bloc de données de 1 Mo avec 16 colonnes:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Ensuite, je les convertis en pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Maintenant, je vais créer un flux de sortie qui va écrire dans la RAM et créer StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Ensuite, nous écrivons 1024 morceaux, ce qui finira par constituer un ensemble de données de 1 Go:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Depuis que nous avons écrit en RAM, nous pouvons obtenir le flux entier dans un seul tampon:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Étant donné que ces données sont en mémoire, la lecture des paquets d'enregistrement Arrow est une opération de copie nulle. J'ouvre StreamReader, je lis les données pyarrow.Table, puis je les convertis en DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Tout cela, bien sûr, est bon, mais vous pouvez avoir des questions. À quelle vitesse cela se passe-t-il? Comment la taille des morceaux affecte-t-elle les performances d'obtention des pandas DataFrame?

Performances de streaming


À mesure que la taille du morceau de streaming diminue, le coût de la reconstruction d'une colonne DataFrame continue dans les pandas augmente en raison de schémas d'accès au cache inefficaces. Il y a également des frais généraux liés au travail avec les structures et les tableaux de données C ++ et leurs tampons de mémoire.

Pour 1 Mo, comme mentionné ci-dessus, sur mon ordinateur portable (Quad-core Xeon E3-1505M), il s'avère:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Il s'avère que la bande passante effective est de 7,75 Gb / s pour récupérer un DataFrame 1 Go à partir de 1024 morceaux de 1 Mo chacun. Que se passe-t-il si nous utilisons des morceaux plus gros ou plus petits? Voici les résultats: Les



performances chutent considérablement de 256K à 64K morceaux. J'ai été surpris que des morceaux de 1 Mo soient traités plus rapidement que 16 Mo. Il vaut la peine de mener une étude plus approfondie et de comprendre s'il s'agit d'une distribution normale ou si quelque chose d'autre l'affecte.

Dans l'implémentation actuelle du format, les données ne sont pas compressées en principe, donc la taille dans la mémoire et dans les fils est approximativement la même. À l'avenir, la compression pourrait devenir une option supplémentaire.

Total


La diffusion en continu de données de colonne peut être un moyen efficace de transférer de grands ensembles de données vers des outils d'analyse de colonne, tels que des pandas, à l'aide de petits morceaux. Les services de données utilisant le stockage orienté ligne peuvent transférer et transposer de petits blocs de données plus pratiques pour le cache L2 et L3 de votre processeur.

Code complet

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

All Articles