Streaming data kolom menggunakan Apache Arrow

Terjemahan artikel disiapkan khusus untuk siswa kursus Data Engineer .




Selama beberapa minggu terakhir, Nong Li dan saya telah menambahkan format streaming biner ke Apache Arrow , melengkapi format file akses / IPC acak yang ada. Kami memiliki implementasi di Java dan C ++ dan Python bindings. Dalam artikel ini saya akan memberi tahu Anda bagaimana formatnya bekerja dan menunjukkan bagaimana Anda dapat mencapai throughput data yang sangat tinggi untuk panda DataFrame.

Streaming data kolom


Pertanyaan umum yang saya dapatkan dari pengguna Arrow adalah pertanyaan tentang mahalnya biaya memindahkan set besar data tabular dari format berorientasi baris atau berorientasi baris ke format kolom. Untuk dataset multi-gigabyte, mentransposisi dalam memori atau pada disk bisa menjadi tugas yang luar biasa.

Untuk streaming data, terlepas dari apakah data sumbernya berupa string atau kolom, satu opsi adalah mengirim paket kecil string, yang masing-masing di dalamnya berisi tata letak kolom.

Di Apache Arrow, kumpulan array kolom dalam memori yang mewakili potongan tabel disebut kumpulan rekaman. Untuk mewakili struktur data tunggal dari tabel logis, beberapa paket rekaman dapat dirakit.

Dalam format file "akses acak" yang ada, kami merekam metadata yang berisi tata letak tabel dan lokasi blok di akhir file, yang memungkinkan Anda untuk dengan sangat murah memilih paket rekaman atau kolom apa pun dari kumpulan data. Dalam format streaming, kami mengirim serangkaian pesan: skema, dan kemudian satu paket rekaman atau lebih.

Format berbeda terlihat seperti yang ditunjukkan pada gambar ini:



Streaming PyArrow: Aplikasi


Untuk menunjukkan kepada Anda bagaimana ini bekerja, saya akan membuat dataset contoh yang mewakili potongan stream tunggal:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Sekarang, misalkan kita ingin merekam 1 GB data yang terdiri dari potongan masing-masing 1 MB, dengan total 1024 potongan. Pertama, mari kita buat frame data 1 MB pertama dengan 16 kolom:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Lalu saya mengubahnya menjadi pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Sekarang saya akan membuat aliran output yang akan menulis ke RAM dan membuat StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Lalu kita menulis 1024 potongan, yang pada akhirnya akan membuat kumpulan data 1GB:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Karena kami menulis dalam RAM, kami bisa mendapatkan seluruh aliran dalam satu buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Karena data ini ada dalam memori, membacakan paket rekaman Arrow adalah operasi tanpa salinan. Saya membuka StreamReader, membaca data pyarrow.Table, dan kemudian mengubahnya menjadi DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

Semua ini, tentu saja, baik, tetapi Anda mungkin memiliki pertanyaan. Seberapa cepat ini terjadi? Bagaimana ukuran chunk memengaruhi kinerja mendapatkan panda DataFrame?

Kinerja Streaming


Ketika ukuran potongan streaming berkurang, biaya merekonstruksi kolom DataFrame kontinu dalam panda meningkat karena skema akses cache yang tidak efisien. Ada juga beberapa overhead dari bekerja dengan struktur dan array data C ++ dan buffer memori mereka.

Untuk 1 MB, seperti yang disebutkan di atas, pada laptop saya (Quad-core Xeon E3-1505M) ternyata:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

Ternyata bandwidth efektif adalah 7,75 Gb / s untuk memulihkan DataFrame 1Gb dari 1024 potongan masing-masing 1MB. Apa yang terjadi jika kita menggunakan bongkahan yang lebih besar atau lebih kecil? Berikut adalah hasilnya:



Kinerja turun secara signifikan dari 256 ribu ke 64 ribu potongan. Saya terkejut bahwa potongan 1 MB diproses lebih cepat dari 16 MB. Adalah bermanfaat untuk melakukan studi yang lebih menyeluruh dan memahami apakah ini adalah distribusi normal atau jika ada sesuatu yang memengaruhinya.

Dalam implementasi format saat ini, data pada prinsipnya tidak dikompres, oleh karena itu ukuran dalam memori dan kabel kira-kira sama. Di masa depan, kompresi dapat menjadi opsi tambahan.

Total


Streaming data kolom dapat menjadi cara yang efektif untuk mentransfer set data besar ke alat analisis kolom, seperti panda, menggunakan potongan kecil. Layanan data yang menggunakan penyimpanan berorientasi garis dapat mentransfer dan mengubah potongan data kecil yang lebih nyaman untuk cache L2 dan L3 prosesor Anda.

Kode lengkap

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

All Articles