带有Apache Arrow的Python高速Apache Parquet

大家致敬。下周,课程将在数据工程师课程的新小组中开始,与此相关,我们将与您分享另一个有趣的翻译。



在过去的一年中,我一直与Apache Parquet社区合作创建parquet-cpp,这是用于读取/写入文件的一流的C ++ Parquet实现,适用于Python和其他数据应用程序。我和Uwe Korn开发了Python界面并与熊猫集成,这pyarrow是Apache Arrow中Python(代码库的一部分

本文是我对2017年战略计划的延续


设计:Python中的高性能列数据。


C ++库Apache Arrow和Parquet是辅助性技术,最初由我们设计用于协调协作。

  • C ++ Arrow库提供内存管理,高效的I / O(文件,内存映射,HDFS),内存中的列数组容器以及极其快速的消息传递(IPC / RPC)。在另一篇文章中,我将仔细研究Arrow消息传递层。
  • C ++ Parquet库负责对Parquet文件格式进行编码和解码。我们已经实现libparquet_arrow了一个库,用于处理Arrow存储器中的数据与底层Parquet读/写工具之间的传输。
  • PyArrow为此提供了一个Python接口,并处理了到pandas.DataFrame的快速转换

Apache Arrow的主要目标之一是创建有效的互操作级别的列内存传输。

您可以在PyArrow代码库中阅读有关Parquet用户API 的信息可在conda-forge上找到库

conda install pyarrow arrow-cpp parquet-cpp -c conda-forge

基准:PyArrow和fastparquet


为了了解PyArrow的性能,我生成了一个512 MB的数据集,其中包含数值数据,展示了Parquet的各种用途。我为数据集生成了两个选项:

  • 高熵:文件中的所有数据值(零值除外)不同。该数据集重469 MB。
  • : . : 23 Snappy. , . , -, .

我用三种主要的压缩方式创建了这些文件:未压缩,快照和gzip。然后,我计算从磁盘获取pandas DataFrame所需的物理时间。

fastparquet是为Python用户使用的Parquet文件读取器/写入器的更新版本,创建用于Dask项目。它是用Python实现的,并使用Numba Python到LLVM编译器来加速Parquet解码过程。我还安装了它以与替代实现进行比较。

将文件读取为pandas.DataFrame的代码类似:

# PyArrow
import pyarrow.parquet as pq
df1 = pq.read_table(path).to_pandas()

# fastparquet
import fastparquet
df2 = fastparquet.ParquetFile(path).to_pandas()

绿色列对应于PyArrow时间:较长的列表示更高的性能/更高的数据吞吐量。硬件-至强E3-1505笔记本电脑。

我根据最新的代码库于2017年2月1日更新了这些基准测试。



发展状况


我们需要螺丝组装和包装方面的帮助。此外,保持conda-forge软件包为最新状态需要花费大量时间。当然,我们正在寻找C ++和Python的开发人员,以为一般的代码库做出贡献。

到目前为止,我们一直特别关注具有简单数据集读写性能的文件格式的质量实现。我们开始使用箭头作为嵌套列数据的容器,在parquet-cpp中处理类似JSON的嵌套数据。Uwe Korn

最近为熊猫转换实现了Arrow 支持List

In [9]: arr = pa.from_pylist([[1,2,3], None, [1, 2], [], [4]])

In [10]: arr
Out[10]:
<pyarrow.array.ListArray object at 0x7f562d551818>
[
  [1,
   2,
   3],
  NA,
  [1,
   2],
  [],
  [4]
]

In [11]: arr.type
Out[11]: DataType(list<item: int64>)

In [12]: t = pa.Table.from_arrays([arr], ['col'])

In [13]: t.to_pandas()
Out[13]:
         col
0  [1, 2, 3]
1       None
2     [1, 2]
3         []
4        [4]

基准代码


import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)

    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan

    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize

    datagen_func = DATA_GENERATORS[dtype]

    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    if compression == 'UNCOMPRESSED':
        compression = None
    pq.write_table(arrow_table, out_path, use_dictionary=False,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

MEGABYTE = 1 << 20
DATA_SIZE = 512 * MEGABYTE
NCOLS = 16

cases = {
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000
    }
}

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    return elapsed

NITER = 5

results = []

readers = [
    ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
]

case_files = {}

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = '{0}_{1}.parquet'.format(case, compression)
        df = generate_data(DATA_SIZE, NCOLS, **params)
        write_to_parquet(df, path, compression=compression)
        df = None
        case_files[case, compression] = path

for case, params in cases.items():
    for compression in ['UNCOMPRESSED', 'SNAPPY', 'GZIP']:
        path = case_files[case, compression]

        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)

        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)



上课程。



All Articles