-
Notifications
You must be signed in to change notification settings - Fork 92
[dbsp] Adaptive joins #5287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbsp] Adaptive joins #5287
Conversation
mihaibudiu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get a general picture.
So the repartitioning is blocking for the current step?
| /// Read all incoming messages for `receiver`. | ||
| /// | ||
| /// Values are passed to callback function `cb`. | ||
| /// Values are passed to callback function `cb` in the order of worker indexes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there an debug_assert for that?
| //! # use csv::Reader; | ||
| //! # use dbsp::utils::{Tup2, Tup3}; | ||
| //! # use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight}; | ||
| //! # use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight, IndexedZSetReader}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it strange that this is not used anywhere?
| Ok(Ok(resp)) => resp, | ||
| }; | ||
|
|
||
| // Receive responses. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not very informative
| /// The storage directory supplied does not match the runtime circuit. | ||
| IncompatibleStorage, | ||
| /// Error deserializing checkpointed state. | ||
| CheckpointParseError(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how come this wasn't needed before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am as surprised as you are
| } | ||
|
|
||
| /// Total weight of records across all shards. | ||
| pub fn total_records(&self) -> ZWeight { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be maintained incrementally
|
|
||
| /// The policy that was used to accumulate the integral before the rebalancing. | ||
| /// This is the policy used during the previous transaction. | ||
| trace_policy: PartitioningPolicy, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previous_trace_policy
| /// Assumes that the cursor was partitioned using Policy::Shard or Policy::Balance, i.e., partitions | ||
| /// in different workers don't overlap. | ||
| /// | ||
| /// Returns after exhausting the cursor or when one of the builders reaches chunk_size entries, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the builder becomes full?
Who continues the work and when?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the check for chunk_size?
| /// Partition cursor based on the current policy. | ||
| /// | ||
| /// Assumes that the cursor was partitioned using Policy::Broadcast, i.e., all partitions contain identical data. | ||
| /// The implementation takes advantage of this by having each worker only send data that belongs to the same worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they have already the data, why is sending even needed? Don't they just have to delete some of the data they already have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you doing this because it's cheaper to retract everything and send what is needed because the data structures are immutable? Still, you don't need to "send" anything, the data is already there, every worker should be able to compute what to keep.
|
|
||
| // Retract the contents of the integral by sending it with negated weights to the accumulator | ||
| // in the same worker. | ||
| while integral_cursor.key_valid() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the loop which continues sending data after builders become full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this does not clear the integral, it just sends the negation to the consumers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You said it yourself above: we use "regular" incremental to rebalance the state internally. This requires sending a stream of changes, not just modify the integral. This stream can be consumed by other operators, not just the integral. This also answers the previous question.
|
It seems that the job of the SQL compiler will be minimal: just create the right type of join (adaptive or non-adaptive), where adaptive joins are allowed only in the toplevel circuit. |
|
Sorry pressed wrong button |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements adaptive joins in DBSP, introducing a balancing system that dynamically selects optimal partitioning policies for join operations based on runtime characteristics like data skew and size. The implementation includes a MaxSAT solver for policy selection, metadata exchange between workers, and comprehensive test coverage.
Key Changes:
- Introduces a
Balancercomponent that manages partitioning policies for joins across the circuit - Adds
accumulate_trace_balancedoperation that combines exchange, accumulation, and integration with dynamic rebalancing - Implements MaxSAT solver for selecting optimal balancing policies under hard constraints
Reviewed changes
Copilot reviewed 66 out of 69 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| crates/dbsp/src/utils/graph.rs | New graph utilities for computing connected components |
| crates/dbsp/src/operator/dynamic/balance/*.rs | Core balancing infrastructure including MaxSAT solver and balancer logic |
| crates/dbsp/src/operator/dynamic/join.rs | Added balanced join variants |
| crates/dbsp/src/operator/dynamic/outer_join.rs | Added balanced left join variants |
| crates/dbsp/src/circuit/circuit_builder.rs | Circuit API extensions for balancer integration |
| crates/dbsp/src/circuit/runtime.rs | Added metadata broadcast mechanism |
| crates/dbsp/src/typed_batch.rs | Moved iter() method to trait for broader accessibility |
Comments suppressed due to low confidence (3)
crates/dbsp/src/operator/dynamic/balance/test.rs:1
- The first argument should be
falseto match the function name 'no_checkpoints'. Currently passingtruefor checkpoints parameter contradicts the test name.
crates/dbsp/src/operator/dynamic/balance/test.rs:1 - The first argument should be
falseto match the function name 'no_checkpoints'. Currently passingtruefor checkpoints parameter contradicts the test name.
crates/dbsp/src/operator/dynamic/balance/test.rs:1 - The second argument should be
trueto match the function calling a left join test, and the test expects checkpoints. The arguments appear to be incorrect.
| } | ||
| } | ||
|
|
||
| // TODO: use a better type thank serde_json::Value. |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'thank' to 'than'
| // TODO: use a better type thank serde_json::Value. | |
| // TODO: use a better type than serde_json::Value. |
bbfa8ae to
653e816
Compare
653e816 to
23c4ad6
Compare
|
We had two functions that initialized logging for testing purposes in the tree. We unify them under `utils`, so we can use logging in new tests as well. Signed-off-by: Leonid Ryzhyk <[email protected]>
Fixes #4632. We implement the design for adaptive joins outlines in #4632. One big caveat is that since multiple join operators can share an input stream, choosing an optimal partitioning policy becomes an optimization problem and requires runtime coordination across many join operators. This commit introduces two new building blocks: * Balancer: collects statistics across all join operators and all workers in the circuit and determines an optimal partitioning policy at each step. * RebalancingExchangeSender - a version of the ExchangeSender operator that can reconfigure its partitioning policy on demand. Adaptive joins are currently disabled by default and can be enabled by a new dev tweak, until we're confident they are stable and do not make things worse for the vast majority of workloads while significantly improving skewed workloads. Signed-off-by: Leonid Ryzhyk <[email protected]>
Signed-off-by: Mihai Budiu <[email protected]>
Fixes #4632.