Hochgeschwindigkeits-Apache-Parkett in Python mit Apache-Pfeil

Alle grüßen. Nächste Woche beginnen die Kurse in der neuen Gruppe des Data Engineer- Kurses . In diesem Zusammenhang werden wir eine weitere interessante Übersetzung mit Ihnen teilen.



Während des letzten Jahres habe ich mit der Apache Parquet-Community zusammengearbeitet, um Parkett-CPP zu erstellen , eine erstklassige C ++ - Parkett-Implementierung zum Lesen / Schreiben von Dateien, die für die Verwendung in Python und anderen Datenanwendungen geeignet ist. Uwe Korn und ich haben die Python-Oberfläche und die Integration mit Pandas als Teil der Python ( pyarrow) - Codebasis in Apache Arrow entwickelt.

Dieser Artikel ist eine Fortsetzung meines Strategieplans für 2017 .


Design: Hochleistungsspaltendaten in Python.


C ++ - Bibliotheken Apache Arrow und Parquet sind Hilfstechnologien, die ursprünglich von uns für eine koordinierte Zusammenarbeit entwickelt wurden.

  • C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
  • C++ Parquet Parquet. libparquet_arrow — , Arrow / Parquet.
  • PyArrow Python pandas.DataFrame.

Eines der Hauptziele von Apache Arrow ist die Schaffung einer effizienten interoperativen Ebene des Spaltenspeichertransports. Informationen

zur Parkett-Benutzer-API finden Sie in der PyArrow-Codebasis . Bibliotheken sind bei conda-forge erhältlich unter:

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

Benchmarks: PyArrow und Fastparquet


Um einen Eindruck von der Leistung von PyArrow zu bekommen, habe ich einen 512-Megabyte-Datensatz mit numerischen Daten generiert, der verschiedene Verwendungszwecke für Parkett demonstriert. Ich habe zwei Optionen für Datensätze generiert:

  • Hohe Entropie : Alle Datenwerte in einer Datei (außer Nullwerten) sind unterschiedlich. Dieser Datensatz wiegt 469 MB.
  • : . : 23 Snappy. , . , -, .

Ich habe diese Dateien in drei Hauptkomprimierungsstilen erstellt: unkomprimiert, bissig und gzip. Dann berechne ich die physische Zeit, die benötigt wird, um den Pandas DataFrame von der Festplatte zu erhalten.

fastparquet ist eine neuere Implementierung des Parkett-Dateireaders / -schreibers für Python-Benutzer, der für die Verwendung im Dask-Projekt erstellt wurde. Es ist in Python implementiert und verwendet den Numba Python-zu-LLVM-Compiler, um die Parkettdecodierungsverfahren zu beschleunigen. Ich habe es auch installiert, um es mit alternativen Implementierungen zu vergleichen.

Der Code zum Lesen der Datei als pandas.DataFrame ist ähnlich:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

Die grünen Spalten entsprechen der PyArrow-Zeit: Längere Spalten zeigen eine höhere Leistung / einen höheren Datendurchsatz an. Hardware - Xeon E3-1505 Laptop.

Ich habe diese Benchmarks am 1. Februar 2017 gemäß den neuesten Codebasen aktualisiert.



Entwicklungsstatus


Wir brauchen Hilfe bei der Montage und Verpackung der Schrauben. Darüber hinaus nimmt es viel Zeit in Anspruch, Conda-Forge-Pakete auf dem neuesten Stand zu halten. Und natürlich suchen wir Entwickler sowohl in C ++ als auch in Python, um Beiträge zur Codebasis im Allgemeinen zu erhalten.

Bisher haben wir besonderes Augenmerk auf die hochwertige Implementierung des Dateiformats mit hoher Lese- und Schreibleistung einfacher Datensätze gelegt. Wir beginnen mit der Verarbeitung verschachtelter JSON-ähnlicher Daten in Parkett-CPP, wobei Arrow als Container für verschachtelte Spaltendaten verwendet wird. Uwe Korn hat

kürzlich die Arrow- Unterstützung für Pandas-Konvertierungen implementiert :List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

Benchmark-Code


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



Steig auf den Kurs.



All Articles