Skip to content

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Jan 5, 2026

I think this is the last operator that was missing the logic to split large outputs into multiple steps.

@ryzhyk ryzhyk requested review from Copilot and mihaibudiu January 5, 2026 18:53
@ryzhyk ryzhyk added the DBSP core Related to the core DBSP library label Jan 5, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds logic to split large outputs into multiple steps for the chain_aggregate operator, completing the implementation of output splitting across operators in the codebase.

Key Changes:

  • Converts ChainAggregate from a BinaryOperator to a StreamingBinaryOperator to enable chunked output processing
  • Implements batching logic that yields results in configurable chunk sizes instead of processing entire datasets at once
  • Updates test configuration to verify chunking behavior with a small chunk size

@ryzhyk ryzhyk force-pushed the chain-aggregate-splitter branch from 51cab13 to dbc94d8 Compare January 5, 2026 18:58
self: Rc<Self>,
delta: &Option<Z>,
output_trace: &Spine<OZ>,
) -> impl futures::Stream<Item = (OZ, bool, Option<Position>)> + 'static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use AsyncStream here as in other places?

delta_cursor.step_key();
delta_cursor.step_key();

if builder.num_tuples() >= chunk_size {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have known this, but this means that the output can be larger than the chunk size?
The current builder does not seem to be reduced to chunk_size and the left-over saved for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the builder can grow up to chunk_size + 1, since it can produce two output records per key.
Once that happens, we will output everything in the current builder and create a fresh builder for remaining outputs.

Not sure if this answers the question.

@ryzhyk ryzhyk added this pull request to the merge queue Jan 5, 2026
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to a conflict with the base branch Jan 5, 2026
I think this is the last operator that was missing the logic to split large outputs
into multiple steps.

Signed-off-by: Leonid Ryzhyk <[email protected]>
@ryzhyk ryzhyk force-pushed the chain-aggregate-splitter branch from dbc94d8 to 8deb916 Compare January 5, 2026 20:37
@ryzhyk ryzhyk enabled auto-merge January 5, 2026 20:38
@ryzhyk ryzhyk added this pull request to the merge queue Jan 5, 2026
Merged via the queue into main with commit 4d4c908 Jan 5, 2026
1 check passed
@ryzhyk ryzhyk deleted the chain-aggregate-splitter branch January 5, 2026 21:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants