-
Notifications
You must be signed in to change notification settings - Fork 261
Description
Problem
The current sort-based shuffle writer uses DataFusion's BatchPartitioner::partition() which calls take_arrays() to split each input batch into per-partition sub-batches. With N output partitions, a single input batch of 8192 rows produces up to N sub-batches averaging 8192/N rows each. With 200 output partitions, that's ~41 rows per batch.
These tiny batches are:
- Appended individually to
PartitionBuffer - Spilled individually to IPC files
- Written individually to the final output file
Each batch carries per-batch Arrow IPC overhead (metadata, alignment padding), and the large number of small batches hurts compression ratios and read performance.
Proposed Solution
Adopt the approach used by Apache DataFusion Comet which avoids this problem using arrow::compute::interleave_record_batch:
- Buffer input batches whole — store unmodified input batches in a
Vec<RecordBatch>instead of immediately splitting them per partition - Track indices, not data — for each row, record
(batch_index, row_index)into per-partition index vectors (Vec<Vec<(u32, u32)>>) - Deferred materialization — when it's time to write (spill or final output), call
interleave_record_batch(&buffered_batches, &indices)to gather rows into well-sized output batches (up to the configuredbatch_size)
This produces properly-sized output batches regardless of the number of output partitions, with a single efficient gather operation instead of many small take_arrays calls.
Comparison
| Aspect | Current | Proposed |
|---|---|---|
| When rows are partitioned | Immediately per input batch | Deferred — only indices tracked |
| Output batch sizes | input_rows / N (tiny) |
Up to batch_size (well-sized) |
| Data copies | take_arrays per partition per batch |
One interleave call at write time |
Additional Consideration
Comet also integrates with DataFusion's MemoryReservation / MemoryConsumer for memory-aware spilling instead of a fixed byte threshold. This could be adopted as a follow-up improvement.