باركيه أباتشي عالي السرعة في Python مع Apache Arrow

كل التحية. في الأسبوع المقبل ، ستبدأ الدروس في المجموعة الجديدة من دورة مهندس البيانات ، فيما يتعلق بهذا ، سنشارك معك ترجمة أخرى مثيرة للاهتمام.



طوال العام الماضي ، كنت أعمل مع مجتمع Apache Parquet لإنشاء باركيه- cpp ، وهو تطبيق C ++ Parquet من الدرجة الأولى لقراءة / كتابة الملفات ، ومناسب للاستخدام في Python وتطبيقات البيانات الأخرى. طورت أنا وأوي كورن واجهة Python والتكامل مع الباندا كجزء من قاعدة كود Python ( pyarrow) في Apache Arrow.

هذه المقالة هي استمرار لخطتي الإستراتيجية لعام 2017 .


التصميم: بيانات العمود عالية الأداء في Python.


مكتبات C ++ Apache Arrow و Parquet هي تقنيات مساعدة تم تصميمها في الأصل من أجل التعاون المنسق.

  • توفر مكتبات C ++ Arrow إدارة للذاكرة ، وإدخال / إخراج فعال (الملفات ، وخريطة الذاكرة ، و HDFS) ، وحاويات صفيف الأعمدة في الذاكرة ، والمراسلة السريعة للغاية (IPC / RPC). سألقي نظرة فاحصة على طبقة رسائل Arrow في مقالة أخرى.
  • مكتبات C ++ الباركيه هي المسؤولة عن ترميز وفك تنسيق ملف الباركيه. لقد نفذنا libparquet_arrow- مكتبة تتعامل مع النقل بين البيانات في ذاكرة السهم وأدوات قراءة وكتابة الباركيه منخفضة المستوى.
  • يوفر PyArrow واجهة Python لكل هذا ويعالج التحويلات السريعة إلى pandas.DataFrame .

أحد الأهداف الرئيسية لـ Apache Arrow هو إنشاء مستوى فعال بين التشغيل لنقل ذاكرة العمود.

يمكنك أن تقرأ عن واجهة مستخدم Parquet في قاعدة كود PyArrow . المكتبات متاحة في كوندا - فورج على:

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

المقاييس: PyArrow و fastparquet


للحصول على فكرة عن أداء PyArrow ، قمت بإنشاء مجموعة بيانات 512 ميجابايت مع البيانات الرقمية التي توضح الاستخدامات المختلفة للباركيه. أنشأت خيارين لمجموعات البيانات:

  • إنتروبيا عالية : جميع قيم البيانات في ملف (باستثناء القيم الصفرية) مختلفة. تزن مجموعة البيانات هذه 469 ميجابايت.
  • : . : 23 Snappy. , . , -, .

لقد أنشأت هذه الملفات في ثلاثة أنماط ضغط رئيسية مستخدمة: غير مضغوطة ، سريعة ، و gzip. ثم أحسب الوقت الفعلي الذي يستغرقه الحصول على الباندا DataFrame من القرص.

fastparquet هو تطبيق أحدث لقارئ / كاتب ملفات الباركيه لمستخدمي Python ، تم إنشاؤه للاستخدام في مشروع Dask. يتم تنفيذه في Python ويستخدم مترجم Numba Python-to-LLVM لتسريع إجراءات فك تشفير الباركيه. لقد قمت أيضًا بتثبيته للمقارنة مع التطبيقات البديلة. يشبه

رمز قراءة الملف باسم pandas.DataFrame :

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

تتوافق الأشرطة الخضراء مع وقت PyArrow: تشير الأشرطة الطويلة إلى أداء أعلى / معدل نقل أعلى للبيانات. الأجهزة - الكمبيوتر المحمول Xeon E3-1505.

لقد قمت بتحديث هذه المعايير في 1 فبراير 2017 وفقًا لأحدث قواعد البرمجة.



حالة التطوير


نحن بحاجة إلى مساعدة في تجميع المسمار وتعبئته. بالإضافة إلى ذلك ، فإن تحديث حزم Conda-forge محدثة يستغرق الكثير من الوقت. وبالطبع ، نحن نبحث عن مطورين في كل من C ++ و Python ، للمساهمة في أساس الشفرة بشكل عام.

حتى الآن ، نولي اهتمامًا خاصًا بتنفيذ جودة تنسيق الملف مع أداء عالي للقراءة والكتابة لمجموعات البيانات البسيطة. بدأنا في الانتقال إلى معالجة البيانات المتشابكة مثل JSON في الباركيه- cpp ، باستخدام Arrow كحاوية لبيانات العمود المتداخلة. نفذ Uwe Korn

مؤخرًا دعم Arrow لتحويلات الباندا:List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

رمز المعيار


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



احصل على الدورة.



All Articles