Apache Arrow के साथ अजगर में हाई स्पीड Apache Parquet

सभी सलाम करते हैं। अगले सप्ताह, डेटा इंजीनियर कोर्स के नए समूह में कक्षाएं शुरू होंगी , इस संबंध में हम आपके साथ एक और दिलचस्प अनुवाद साझा करेंगे।



पिछले एक साल के दौरान, मैं Apache Parquet समुदाय के साथ काम कर रहा हूँ ताकि parquet-cpp , एक प्रथम श्रेणी C ++ Parquet कार्यान्वयन पठन / लेखन फ़ाइलों के लिए, पायथन और अन्य डेटा अनुप्रयोगों में उपयोग के लिए उपयुक्त हो सके। Uwe Korn और मैंने pyarrowApache Arrow में Python ( ) कोड बेस के हिस्से के रूप में पायथन इंटरफेस और इंटीग्रेशन को पांडा के साथ विकसित किया

यह लेख 2017 के लिए मेरी रणनीतिक योजना का एक निरंतरता है


डिज़ाइन: पायथन में उच्च-प्रदर्शन कॉलम डेटा।


C ++ लाइब्रेरीज़ Apache Arrow और Parquet सहायक तकनीकें हैं जिन्हें मूल रूप से समन्वित सहयोग के लिए हमारे द्वारा डिज़ाइन किया गया था।

  • C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
  • C++ Parquet Parquet. libparquet_arrow — , Arrow / Parquet.
  • PyArrow Python pandas.DataFrame.

अपाचे एरो का एक मुख्य लक्ष्य कॉलम मेमोरी ट्रांसपोर्ट का एक कुशल इंटर-ऑपरेशनल स्तर बनाना है।

आप PyArrow कोड बेस में Parquet उपयोगकर्ता API के बारे में पढ़ सकते हैं लाइब्रेरी कोंडा-फोर्ज में उपलब्ध हैं :

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

बेंचमार्क: PyArrow और fastparquet


PyArrow के प्रदर्शन का विचार प्राप्त करने के लिए, मैंने संख्यात्मक डेटा के साथ 512 मेगाबाइट डेटासेट उत्पन्न किया जो Parquet के लिए विभिन्न उपयोगों को प्रदर्शित करता है। मैंने डेटासेट के लिए दो विकल्प तैयार किए:

  • उच्च एन्ट्रापी : एक फ़ाइल में सभी डेटा मान (शून्य मानों को छोड़कर) भिन्न होते हैं। इस डेटासेट का वजन 469 एमबी है।
  • : . : 23 Snappy. , . , -, .

मैं तीन मुख्य संपीड़न शैलियों में इन फ़ाइलों को बनाया: असम्पीडित, तेज़ और gzip। फिर मैं डिस्क से पांडा डेटाफ़्रेम प्राप्त करने में लगने वाले भौतिक समय की गणना करता हूं।

fastparquet Python उपयोगकर्ताओं के लिए Parquet फ़ाइल रीडर / लेखक का एक नया कार्यान्वयन है, जो Dask प्रोजेक्ट में उपयोग के लिए बनाया गया है। यह पाइथन में लागू किया गया है और परक डिकोडिंग प्रक्रियाओं को गति देने के लिए नुम्बा पायथन-टू-एलएलवीएम कंपाइलर का उपयोग करता है। वैकल्पिक कार्यान्वयन के साथ तुलना करने के लिए मैंने इसे स्थापित किया।

फ़ाइल को pandas.DataFrame के रूप में पढ़ने के लिए कोड समान है:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

हरे स्तंभ PyArrow समय के अनुरूप हैं: लंबे कॉलम उच्च प्रदर्शन / उच्च डेटा थ्रूपुट का संकेत देते हैं। हार्डवेयर - Xeon E3-1505 लैपटॉप।

मैंने 1 फरवरी, 2017 को नवीनतम कोडबेस के अनुसार इन बेंचमार्क को अपडेट किया।



विकास की स्थिति


हमें स्क्रू असेंबली और पैकेजिंग की मदद चाहिए। इसके अलावा, कोंडा-फोर्ज पैकेज को अद्यतित रखने में बहुत समय लगता है। और निश्चित रूप से, हम C ++ और पायथन दोनों में डेवलपर्स की तलाश कर रहे हैं, सामान्य तौर पर कोड बेस में योगदान के लिए।

अब तक, हमने सरल डेटा सेटों के उच्च पढ़ने और लिखने के साथ फ़ाइल प्रारूप की गुणवत्ता के कार्यान्वयन पर विशेष ध्यान दिया है। हम एनस्टोन कॉलम के लिए एक कंटेनर के रूप में एरो का उपयोग करते हुए, लकड़ी की छत में नेस्टेड JSON जैसे डेटा को संसाधित करने के लिए आगे बढ़ना शुरू कर रहे हैं। Uwe Korn ने

हाल ही में पांडा रूपांतरण के लिए एरो समर्थन को लागू किया :List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

बेंचमार्क कोड


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



पाठ्यक्रम पर जाओ।



All Articles