دفق بيانات العمود باستخدام Apache Arrow

تم إعداد ترجمة للمقال خصيصًا لطلاب دورة مهندس البيانات .




على مدار الأسابيع القليلة الماضية ، أضفت أنا و Nong Li تنسيق بث ثنائي إلى Apache Arrow ، مكملين تنسيق ملف الوصول / IPC العشوائي الحالي. لدينا تطبيقات في Java و C ++ و Python bindings. في هذه المقالة ، سأخبرك بكيفية عمل التنسيق وأظهر كيف يمكنك تحقيق إنتاجية بيانات عالية جدًا لباندا DataFrame.

تدفق بيانات العمود


السؤال الشائع الذي أحصل عليه من مستخدمي Arrow هو مسألة التكلفة العالية لنقل مجموعات كبيرة من البيانات الجدولية من تنسيق موجه نحو الصف أو موجه نحو الصف إلى تنسيق العمود. بالنسبة لمجموعات البيانات متعددة الجيجابايت ، يمكن أن يكون التبديل في الذاكرة أو على القرص مهمة شاقة.

لدفق البيانات ، بغض النظر عما إذا كانت البيانات المصدر هي سلسلة أو عمود ، فإن أحد الخيارات هو إرسال حزم صغيرة من السلاسل ، كل منها يحتوي على تخطيط عمود.

في Apache Arrow ، تسمى مجموعة من صفائف الأعمدة في الذاكرة التي تمثل قطعة الجدول دفعة قياسية. لتمثيل هيكل بيانات واحد لجدول منطقي ، يمكن تجميع العديد من حزم السجلات.

في تنسيق ملف "الوصول العشوائي" الحالي ، نسجل البيانات الوصفية التي تحتوي على تخطيط الجدول وموقع الكتل في نهاية الملف ، مما يسمح لك بتحديد أي حزمة سجل أو أي عمود من مجموعة البيانات بتكلفة زهيدة. في تنسيق دفق ، نرسل سلسلة من الرسائل: مخطط ، ثم حزمة أو أكثر من السجلات.

تبدو التنسيقات المختلفة شيئًا مثل الشكل الموضح في هذا الشكل:



PyArrow Streaming: التطبيق


لتوضيح كيفية عمل ذلك ، سأقوم بإنشاء مجموعة بيانات نموذجية تمثل مجموعة دفق واحدة:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

الآن ، لنفترض أننا نريد تسجيل 1 غيغابايت من البيانات تتكون من قطع بسعة 1 ميغا بايت لكل منها ، أي ما مجموعه 1024 قطعة. أولاً ، دعنا ننشئ أول إطار بيانات 1 ميجا بايت من 16 عمودًا:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

ثم أقوم بتحويلها إلى pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

الآن سأقوم بإنشاء دفق إخراج سيكتب على ذاكرة الوصول العشوائي وإنشاء StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

ثم نكتب 1024 قطعة ، والتي ستشكل في النهاية مجموعة بيانات 1 جيجابايت:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

نظرًا لأننا كتبنا في ذاكرة الوصول العشوائي ، يمكننا الحصول على الدفق بالكامل في مخزن مؤقت واحد:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

نظرًا لأن هذه البيانات في الذاكرة ، فإن قراءة حزم سجلات Arrow هي عملية نسخ صفري. أقوم بفتح StreamReader وقراءة البيانات pyarrow.Tableثم تحويلها إلى DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

كل هذا جيد بالطبع ، ولكن قد يكون لديك أسئلة. ما مدى سرعة هذا؟ كيف يؤثر حجم القطعة على أداء الحصول على الباندا DataFrame؟

تدفق الأداء


مع انخفاض حجم مجموعة البث ، ترتفع تكلفة إعادة بناء عمود DataFrame المستمر في الباندا بسبب عدم كفاءة مخططات الوصول إلى ذاكرة التخزين المؤقت. هناك أيضًا بعض النفقات العامة من العمل مع هياكل ومصفوفات بيانات C ++ ومخازن الذاكرة المؤقتة الخاصة بهم.

بالنسبة إلى 1 ميغابايت ، كما هو مذكور أعلاه ، على جهاز الكمبيوتر المحمول الخاص بي (Quad-core Xeon E3-1505M) اتضح:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

وتبين أن النطاق الترددي الفعال هو 7.75 جيجابت / ثانية لاستعادة إطار بيانات 1 جيجا بايت من 1024 قطعة بحجم 1 ميجا بايت لكل منهما. ماذا يحدث إذا استخدمنا قطعًا أكبر أو أصغر؟ فيما يلي النتائج:



ينخفض ​​الأداء بشكل كبير من 256 كيلوبايت إلى 64 كيلوبايت. لقد فوجئت بأن 1 ميغا بايت معالجة أسرع من 16 ميغا بايت. من المفيد إجراء دراسة أكثر شمولاً وفهم ما إذا كان هذا توزيعًا طبيعيًا أو إذا كان هناك شيء آخر يؤثر عليه.

في التطبيق الحالي للصيغة ، لا يتم ضغط البيانات من حيث المبدأ ، وبالتالي فإن حجم الذاكرة والأسلاك هو نفسه تقريبًا. في المستقبل ، قد يصبح الضغط خيارًا إضافيًا.

مجموع


يمكن أن يكون دفق بيانات العمود وسيلة فعالة لنقل مجموعات البيانات الكبيرة إلى أدوات تحليل الأعمدة ، مثل الباندا ، باستخدام قطع صغيرة. يمكن لخدمات البيانات التي تستخدم التخزين الموجه للخط نقل أجزاء صغيرة من البيانات ونقلها وأكثر ملاءمة لذاكرة التخزين المؤقت L2 و L3 الخاصة بالمعالج.

كود كامل

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

All Articles