RecordBatchReader functionality

RecordBatchReader functionality#

Table.select() returns a pyarrow.RecordBatchReader:

Table.select(
    columns: List[str] | None = None, 
    predicate: ibis.expr.types.BooleanColumn | ibis.common.deferred.Deferred = None, 
    config: QueryConfig | None = None, 
    *, 
    internal_row_id: bool = False
  ) 
  -> pyarrow.RecordBatchReader

Here is a deeper dive:

  • The record batches are constructed by the client from Arrow buffers read from the server (containing the actual data). The server is sending the response column-by-column, and the client collects and merges them into a single RecordBatch (by adjusting the metadata, without copying the actual data).

  • RecordBatch creation isn’t deterministic, since in general the data is being read in parallel, using multiple splits (each possible using a separate HTTP connection) & subsplits (each running on an internal CNode fiber).

  • The RecordBatches are pushed into a client-side size-limited queue and then yielded to the caller - allowing the client (working with a RecordBatchReader) to iterate over the response in chunks, holding only a few RecordBatches in memory.

  • DuckDB has a special support for RecordBatchReader objects in its SQL syntax, by dynamically accessing the Python namespace and resolving the RecordBatchReader as a table. E.g.

from ibis import _

import duckdb
conn = duckdb.connect()

with session.transaction() as tx:
    table = tx.bucket("bucket-name").schema("schema-name").table("table-name")
    batches = table.select(columns=['c1'], predicate=(_.c2 > 2))

    # batches is a RecordBatchReader
    print(conn.execute("SELECT sum(c1) FROM batches").arrow())