Streaming von Spaltendaten mit Apache Arrow

Eine Übersetzung des Artikels wurde speziell für Studenten des Data Engineer- Kurses erstellt .




In den letzten Wochen haben Nong Li und ich Apache Arrow ein binäres Streaming-Format hinzugefügt , das das vorhandene Direktzugriffs- / IPC-Dateiformat ergänzt. Wir haben Implementierungen in Java- und C ++ - und Python-Bindungen. In diesem Artikel werde ich Ihnen erklären, wie das Format funktioniert, und zeigen, wie Sie einen sehr hohen Datendurchsatz für DataFrame-Pandas erzielen können.

Spalten-Daten-Streaming


Eine häufig gestellte Frage von Arrow-Benutzern ist die Frage nach den hohen Kosten für das Verschieben großer Mengen tabellarischer Daten von einem zeilenorientierten oder zeilenorientierten Format in ein Spaltenformat. Bei Datensätzen mit mehreren Gigabyte kann das Transponieren im Speicher oder auf der Festplatte eine überwältigende Aufgabe sein.

Für das Streaming von Daten besteht eine Option, unabhängig davon, ob es sich bei den Quelldaten um Zeichenfolgen oder Spalten handelt, darin, kleine Zeilenpakete zu senden, von denen jede ein Spaltenlayout enthält.

In Apache Arrow wird eine Sammlung von speicherinternen Spaltenarrays, die einen Tabellenblock darstellen, als Datensatzstapel bezeichnet. Um eine einzelne Datenstruktur einer logischen Tabelle darzustellen, können mehrere Datensatzpakete zusammengestellt werden.

Im vorhandenen Dateiformat mit wahlfreiem Zugriff zeichnen wir Metadaten auf, die das Tabellenlayout und die Position der Blöcke am Ende der Datei enthalten. Auf diese Weise können Sie jedes Datensatzpaket oder jede Spalte aus dem Datensatz äußerst kostengünstig auswählen. In einem Streaming-Format senden wir eine Reihe von Nachrichten: ein Schema und dann ein oder mehrere Pakete von Datensätzen.

Die verschiedenen Formate sehen ungefähr so ​​aus wie in dieser Abbildung:



PyArrow Streaming: Anwendung


Um Ihnen zu zeigen, wie dies funktioniert, werde ich ein Beispieldatensatz erstellen, das einen einzelnen Stream-Block darstellt:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Angenommen, wir möchten 1 GB Daten aufzeichnen, die aus Blöcken von jeweils 1 MB bestehen, was insgesamt 1024 Blöcken entspricht. Erstellen wir zunächst den ersten 1-MB-Datenrahmen mit 16 Spalten:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Dann konvertiere ich sie zu pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Jetzt werde ich einen Ausgabestream erstellen, der in den RAM schreibt und Folgendes erstellt StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Dann schreiben wir 1024 Chunks, die schließlich einen 1-GB-Datensatz bilden:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Da wir im RAM geschrieben haben, können wir den gesamten Stream in einem Puffer abrufen:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Da sich diese Daten im Speicher befinden, ist das Auslesen von Pfeilaufzeichnungspaketen eine Nullkopieoperation. Ich öffne StreamReader, lese die Daten ein pyarrow.Tableund konvertiere sie dann in DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Das alles ist natürlich gut, aber Sie können Fragen haben. Wie schnell geht das? Wie wirkt sich die Blockgröße auf die Leistung beim Abrufen von DataFrame-Pandas aus?

Streaming-Leistung


Wenn die Größe des Streaming-Blocks abnimmt, steigen die Kosten für die Rekonstruktion einer kontinuierlichen DataFrame-Spalte in Pandas aufgrund ineffizienter Cache-Zugriffsschemata. Die Arbeit mit C ++ - Datenstrukturen und -Arrays und ihren Speicherpuffern ist ebenfalls mit einem gewissen Aufwand verbunden.

Wie oben erwähnt, stellt sich für 1 MB auf meinem Laptop (Quad-Core Xeon E3-1505M) Folgendes heraus:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Es stellt sich heraus, dass die effektive Bandbreite 7,75 Gbit / s für die Wiederherstellung eines 1-Gbit-Datenrahmens mit 1024 Blöcken zu je 1 MB beträgt. Was passiert, wenn wir größere oder kleinere Stücke verwenden? Hier sind die Ergebnisse: Die



Leistung sinkt erheblich von 256 KB auf 64 KB. Ich war überrascht, dass 1-MB-Chunks schneller als 16 MB verarbeitet wurden. Es lohnt sich, eine gründlichere Studie durchzuführen und zu verstehen, ob dies eine Normalverteilung ist oder ob etwas anderes sie beeinflusst.

Bei der aktuellen Implementierung des Formats werden die Daten im Prinzip nicht komprimiert, daher ist die Größe im Speicher und in den Drähten ungefähr gleich. In Zukunft kann die Komprimierung eine zusätzliche Option sein.

Gesamt


Das Streaming von Spaltendaten kann eine effektive Möglichkeit sein, große Datenmengen mithilfe kleiner Blöcke an Spaltenanalysewerkzeuge wie Pandas zu übertragen. Datendienste, die zeilenorientierten Speicher verwenden, können kleine Datenblöcke übertragen und transponieren, die für den L2- und L3-Cache Ihres Prozessors bequemer sind.

Vollständiger Code

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

All Articles