Streaming column data using Apache Arrow

A translation of the article was prepared specifically for students of the Data Engineer course .




Over the past few weeks, Nong Li and I have added a binary streaming format to Apache Arrow , complementing the existing random access / IPC file format. We have implementations in Java and C ++ and Python bindings. In this article I will tell you how the format works and show how you can achieve very high data throughput for DataFrame pandas.

Column data streaming


A common question I get from Arrow users is the question of the high cost of moving large sets of tabular data from a row-oriented or row-oriented format to a column format. For multi-gigabyte datasets, transposing in memory or on disk can be an overwhelming task.

For streaming data, regardless of whether the source data is string or column, one option is to send small packets of strings, each of which inside contains a column layout.

In Apache Arrow, a collection of in-memory column arrays representing a table chunk is called a record batch. To represent a single data structure of a logical table, several record packages can be assembled.

In the existing “random access” file format, we record metadata containing the table layout and the location of the blocks at the end of the file, which allows you to extremely cheaply select any record package or any column from the data set. In a streaming format, we send a series of messages: a scheme, and then one or more packets of records.

The different formats look something like the one shown in this figure:



PyArrow Streaming: Application


To show you how this works, I will create an example dataset representing a single stream chunk:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

Now, suppose we want to record 1 GB of data consisting of chunks of 1 MB each, for a total of 1024 chunks. First, let's create the first 1 MB data frame with 16 columns:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

Then I convert them to pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

Now I will create an output stream that will write to RAM and create StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

Then we write 1024 chunks, which will eventually make up a 1GB data set:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

Since we wrote in RAM, we can get the entire stream in one buffer:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

Since this data is in memory, reading out Arrow record packets is a zero-copy operation. I open StreamReader, read the data in pyarrow.Table, and then convert it to DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

All this, of course, is good, but you may have questions. How fast is this going on? How does chunk size affect the performance of getting DataFrame pandas?

Streaming Performance


As the size of the streaming chunk decreases, the cost of reconstructing a continuous DataFrame column in pandas increases due to inefficient cache access schemes. There is also some overhead from working with C ++ data structures and arrays and their memory buffers.

For 1 MB, as mentioned above, on my laptop (Quad-core Xeon E3-1505M) it turns out:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

It turns out that the effective bandwidth is 7.75 Gb / s for restoring a 1Gb DataFrame of 1024 chunks of 1MB each. What happens if we use larger or smaller chunks? Here are the results:



Performance drops significantly from 256K to 64K chunks. I was surprised that 1 MB chunks processed faster than 16 MB. It is worthwhile to conduct a more thorough study and understand whether this is a normal distribution or if something else affects it.

In the current implementation of the format, the data is not compressed in principle, therefore the size in the memory and in the wires is approximately the same. In the future, compression may become an additional option.

Total


Streaming column data can be an effective way to transfer large data sets to column analysis tools, such as pandas, using small chunks. Data services using line-oriented storage can transfer and transpose small data chunks that are more convenient for your processor’s L2 and L3 cache.

Full code

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

All Articles