High Speed โ€‹โ€‹Apache Parquet in Python with Apache Arrow

All salute. Next week, classes will begin in the new group of the Data Engineer course , in connection with this we will share another interesting translation with you.



Throughout the past year, I have been working with the Apache Parquet community to create parquet-cpp , a first-class C ++ Parquet file read / write implementation suitable for use in Python and other data applications. Uwe Korn and I developed the Python interface and integration with pandas as part of the Python ( pyarrow) code base in Apache Arrow.

This article is a continuation of my strategic plan for 2017 .


Design: high-performance column data in Python.


C ++ libraries Apache Arrow and Parquet are auxiliary technologies that were originally designed by us for coordinated collaboration.

  • C++ Arrow , / (, memory map, HDFS), (IPC/RPC). Arrow .
  • C++ Parquet Parquet. libparquet_arrow โ€” , Arrow / Parquet.
  • PyArrow Python pandas.DataFrame.

One of the main goals of Apache Arrow is to create an efficient inter-operational level of column memory transport.

You can read about the Parquet user API in the PyArrow code base . Libraries are available at conda-forge at:

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

Benchmarks: PyArrow and fastparquet


To get an idea of โ€‹โ€‹PyArrow's performance, I generated a 512 megabyte dataset with numeric data that demonstrates various uses for Parquet. I generated two options for datasets:

  • High entropy : all data values โ€‹โ€‹in a file (except for zero values) are different. This dataset weighs 469 MB.
  • : . : 23 Snappy. , . , -, .

I created these files in three main compression styles used: uncompressed, snappy and gzip. Then I calculate the physical time it takes to get the pandas DataFrame from disk.

fastparquet is a newer implementation of the Parquet file reader / writer for Python users, created for use in the Dask project. It is implemented in Python and uses the Numba Python-to-LLVM compiler to speed up Parquet decoding procedures. I also installed it to compare with alternative implementations.

The code for reading the file as pandas.DataFrame is similar:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

Green bars correspond to PyArrow time: longer bars indicate higher performance / higher data throughput. Hardware - Xeon E3-1505 Laptop.

I updated these benchmarks on February 1, 2017 in accordance with the latest codebases.



Development status


We need help with screw assembly and packaging. In addition, keeping conda-forge packages up to date takes a lot of time. And of course, we are looking for developers in both C ++ and Python, for contributions to the code base in general.

Until now, we have paid special attention to the quality implementation of the file format with high read and write performance of simple data sets. We are starting to move on to processing nested JSON-like data in parquet-cpp, using Arrow as a container for nested column data. Uwe Korn

recently implemented Arrow support for pandas conversions:List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

Benchmark code


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



Get on the course.



All Articles