Skip to content

Use interleave_record_batch to avoid tiny batches in sort-based shuffle #1432

@andygrove

Description

@andygrove

Problem

The current sort-based shuffle writer uses DataFusion's BatchPartitioner::partition() which calls take_arrays() to split each input batch into per-partition sub-batches. With N output partitions, a single input batch of 8192 rows produces up to N sub-batches averaging 8192/N rows each. With 200 output partitions, that's ~41 rows per batch.

These tiny batches are:

  • Appended individually to PartitionBuffer
  • Spilled individually to IPC files
  • Written individually to the final output file

Each batch carries per-batch Arrow IPC overhead (metadata, alignment padding), and the large number of small batches hurts compression ratios and read performance.

Proposed Solution

Adopt the approach used by Apache DataFusion Comet which avoids this problem using arrow::compute::interleave_record_batch:

  1. Buffer input batches whole — store unmodified input batches in a Vec<RecordBatch> instead of immediately splitting them per partition
  2. Track indices, not data — for each row, record (batch_index, row_index) into per-partition index vectors (Vec<Vec<(u32, u32)>>)
  3. Deferred materialization — when it's time to write (spill or final output), call interleave_record_batch(&buffered_batches, &indices) to gather rows into well-sized output batches (up to the configured batch_size)

This produces properly-sized output batches regardless of the number of output partitions, with a single efficient gather operation instead of many small take_arrays calls.

Comparison

Aspect Current Proposed
When rows are partitioned Immediately per input batch Deferred — only indices tracked
Output batch sizes input_rows / N (tiny) Up to batch_size (well-sized)
Data copies take_arrays per partition per batch One interleave call at write time

Additional Consideration

Comet also integrates with DataFusion's MemoryReservation / MemoryConsumer for memory-aware spilling instead of a fixed byte threshold. This could be adopted as a follow-up improvement.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions