-
Notifications
You must be signed in to change notification settings - Fork 92
[dbsp] Split large outputs in chain_aggregate. #5376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds logic to split large outputs into multiple steps for the chain_aggregate operator, completing the implementation of output splitting across operators in the codebase.
Key Changes:
- Converts
ChainAggregatefrom aBinaryOperatorto aStreamingBinaryOperatorto enable chunked output processing - Implements batching logic that yields results in configurable chunk sizes instead of processing entire datasets at once
- Updates test configuration to verify chunking behavior with a small chunk size
51cab13 to
dbc94d8
Compare
| self: Rc<Self>, | ||
| delta: &Option<Z>, | ||
| output_trace: &Spine<OZ>, | ||
| ) -> impl futures::Stream<Item = (OZ, bool, Option<Position>)> + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use AsyncStream here as in other places?
| delta_cursor.step_key(); | ||
| delta_cursor.step_key(); | ||
|
|
||
| if builder.num_tuples() >= chunk_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have known this, but this means that the output can be larger than the chunk size?
The current builder does not seem to be reduced to chunk_size and the left-over saved for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the builder can grow up to chunk_size + 1, since it can produce two output records per key.
Once that happens, we will output everything in the current builder and create a fresh builder for remaining outputs.
Not sure if this answers the question.
I think this is the last operator that was missing the logic to split large outputs into multiple steps. Signed-off-by: Leonid Ryzhyk <[email protected]>
dbc94d8 to
8deb916
Compare
I think this is the last operator that was missing the logic to split large outputs into multiple steps.