Apache Parket Berkecepatan Tinggi dalam Python dengan Apache Arrow

Semua salut Minggu depan, kelas akan dimulai pada grup baru kursus Data Engineer , sehubungan dengan ini kami akan membagikan terjemahan menarik lainnya dengan Anda.



Sepanjang tahun lalu, saya telah bekerja dengan komunitas Apache Parket untuk membuat Parket-cpp , implementasi C ++ Parket kelas satu untuk membaca / menulis file, cocok untuk digunakan dalam Python dan aplikasi data lainnya. Uwe Korn dan saya mengembangkan antarmuka Python dan integrasi dengan panda sebagai bagian dari basis kode Python ( pyarrow) di Apache Arrow.

Artikel ini merupakan kelanjutan dari rencana strategis saya untuk 2017 .


Desain: data kolom berkinerja tinggi dalam Python.


Perpustakaan C ++ Apache Arrow and Parquet adalah teknologi tambahan yang awalnya dirancang oleh kami untuk kolaborasi terkoordinasi.

  • C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
  • C++ Parquet Parquet. libparquet_arrow — , Arrow / Parquet.
  • PyArrow Python pandas.DataFrame.

Salah satu tujuan utama Apache Arrow adalah untuk menciptakan tingkat transportasi kolom memori antar-operasional yang efisien.

Anda dapat membaca tentang API pengguna Parket di basis kode PyArrow . Perpustakaan tersedia di conda-forge di:

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

Benchmark: PyArrow dan fastparquet


Untuk mendapatkan gambaran tentang kinerja PyArrow, saya menghasilkan dataset 512 megabyte dengan data numerik yang menunjukkan berbagai kegunaan untuk Parket. Saya menghasilkan dua opsi untuk dataset:

  • Entropi tinggi : semua nilai data dalam file (kecuali untuk nilai nol) berbeda. Dataset ini memiliki berat 469 MB.
  • : . : 23 Snappy. , . , -, .

Saya membuat file-file ini dalam tiga gaya kompresi utama yang digunakan: tidak terkompresi, tajam dan gzip. Lalu saya menghitung waktu fisik yang dibutuhkan untuk mendapatkan panda DataFrame dari disk.

fastparquet adalah implementasi yang lebih baru dari pembaca / penulis file Parket untuk pengguna Python, dibuat untuk digunakan dalam proyek Dask. Ini diimplementasikan dalam Python dan menggunakan kompiler Numba Python-ke-LLVM untuk mempercepat prosedur decoding Parket. Saya juga menginstalnya untuk membandingkan dengan implementasi alternatif.

Kode untuk membaca file sebagai pandas.DataFrame serupa:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

Bilah hijau sesuai dengan waktu PyArrow: bilah lagi menunjukkan kinerja yang lebih tinggi / throughput data yang lebih tinggi. Perangkat Keras - Laptop Xeon E3-1505.

Saya memperbarui tolok ukur ini pada 1 Februari 2017 sesuai dengan basis kode terbaru.



Status perkembangan


Kami membutuhkan bantuan dengan perakitan sekrup dan pengemasan. Selain itu, menjaga paket conda-forge tetap terbaru membutuhkan banyak waktu. Dan tentu saja, kami mencari pengembang di C ++ dan Python, untuk kontribusi pada basis kode secara umum.

Hingga saat ini, kami telah memberikan perhatian khusus pada kualitas implementasi format file dengan kinerja baca dan tulis set data sederhana yang tinggi. Kami mulai beralih ke pemrosesan data bersarang JSON di parket-cpp, menggunakan Arrow sebagai wadah untuk data kolom bersarang. Uwe Korn

baru-baru ini mengimplementasikan dukungan Arrow untuk konversi panda:List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

Kode benchmark


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



Ikuti saja.



All Articles