使用Apache Arrow流化列数据

本文的翻译是专门为数据工程师课程的学生准备的




在过去的几周中,我和Nong LiApache Arrow添加了二进制流格式,补充现有的随机访问/ IPC文件格式。我们有Java,C ++和Python绑定的实现。在本文中,我将告诉您格式的工作原理,并说明如何为DataFrame熊猫实现非常高的数据吞吐量。

列数据流


我从Arrow用户那里得到的一个常见问题是将大型表格数据集从面向行或面向行的格式转换为列格式的高昂成本的问题。对于数千兆字节的数据集,在内存或磁盘中进行转置可能是一项艰巨的任务。

对于流数据,不管源数据是字符串还是列,一种选择是发送小的字符串包,其中每个字符串都包含列布局。

在Apache Arrow中,代表一个表块的内存中列数组的集合称为记录批处理。要表示逻辑表的单个数据结构,可以收集几个记录包。

在现有的“随机访问”文件格式中,我们记录元数据,该元数据包含表布局和文件末尾块的位置,这使您可以非常便宜地从数据集中选择任何记录包或任何列。以流格式,我们发送一系列消息:一个方案,然后一个或多个记录包。

不同的格式类似于此图所示:



PyArrow流:应用程序


为了向您展示这是如何工作的,我将创建一个表示单个流块的示例数据集:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

现在,假设我们要记录1 GB的数据,每个数据由1 MB的块组成,总共1024个块。首先,让我们创建具有16列的第一个1 MB数据帧:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

然后我将它们转换为pyarrow.RecordBatch

batch = pa.RecordBatch.from_pandas(df)

现在,我将创建一个输出流,该流将写入RAM并创建StreamWriter

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

然后我们写入1024个块,这些块最终将构成1GB的数据集:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

由于我们是在RAM中编写的,因此可以在一个缓冲区中获取整个流:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

由于此数据在内存中,因此读出Arrow记录数据包是零复制操作。我打开StreamReader,读取中的数据pyarrow.Table,然后将其转换为DataFrame pandas

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

当然,所有这些都很好,但是您可能会有疑问。这个过程进行得有多快?块大小如何影响获取DataFrame熊猫的性能?

流媒体性能


随着流式传输块的大小减小,由于效率低下的缓存访问方案,在熊猫中重建连续DataFrame列的成本增加。使用C ++数据结构和数组及其内存缓冲区也有一些开销。

如上所述,在我的笔记本电脑(四核Xeon E3-1505M)上,对于1 MB,结果是:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

事实证明,恢复1024个1MB块的1Gb DataFrame的有效带宽为7.75 Gb / s。如果我们使用更大或更小的块会发生什么?结果如下:



性能从256K块显着下降到64K。我感到惊讶的是1 MB的块处理速度快于16 MB。值得进行更深入的研究,并了解这是否是正态分布或是否有其他因素影响它。

在该格式的当前实现中,原则上不对数据进行压缩,因此内存和导线中的大小大致相同。将来,压缩可能会成为附加选项。


流式列数据是使用小块将大型数据集传输到诸如熊猫之类的列分析工具的有效方法。使用面向行存储的数据服务可以传输和转置较小的数据块,这些数据块对于处理器的L2和L3缓存更为方便。

完整代码

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

All Articles