Skip to content

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Dec 15, 2025

Fixes #4632.

Copy link
Contributor

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get a general picture.
So the repartitioning is blocking for the current step?

/// Read all incoming messages for `receiver`.
///
/// Values are passed to callback function `cb`.
/// Values are passed to callback function `cb` in the order of worker indexes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an debug_assert for that?

//! # use csv::Reader;
//! # use dbsp::utils::{Tup2, Tup3};
//! # use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight};
//! # use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight, IndexedZSetReader};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it strange that this is not used anywhere?

Ok(Ok(resp)) => resp,
};

// Receive responses.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not very informative

/// The storage directory supplied does not match the runtime circuit.
IncompatibleStorage,
/// Error deserializing checkpointed state.
CheckpointParseError(String),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come this wasn't needed before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am as surprised as you are

}

/// Total weight of records across all shards.
pub fn total_records(&self) -> ZWeight {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be maintained incrementally


/// The policy that was used to accumulate the integral before the rebalancing.
/// This is the policy used during the previous transaction.
trace_policy: PartitioningPolicy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous_trace_policy

/// Assumes that the cursor was partitioned using Policy::Shard or Policy::Balance, i.e., partitions
/// in different workers don't overlap.
///
/// Returns after exhausting the cursor or when one of the builders reaches chunk_size entries,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the builder becomes full?
Who continues the work and when?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the check for chunk_size?

/// Partition cursor based on the current policy.
///
/// Assumes that the cursor was partitioned using Policy::Broadcast, i.e., all partitions contain identical data.
/// The implementation takes advantage of this by having each worker only send data that belongs to the same worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they have already the data, why is sending even needed? Don't they just have to delete some of the data they already have?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you doing this because it's cheaper to retract everything and send what is needed because the data structures are immutable? Still, you don't need to "send" anything, the data is already there, every worker should be able to compute what to keep.


// Retract the contents of the integral by sending it with negated weights to the accumulator
// in the same worker.
while integral_cursor.key_valid() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the loop which continues sending data after builders become full?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this does not clear the integral, it just sends the negation to the consumers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You said it yourself above: we use "regular" incremental to rebalance the state internally. This requires sending a stream of changes, not just modify the integral. This stream can be consumed by other operators, not just the integral. This also answers the previous question.

@mihaibudiu
Copy link
Contributor

It seems that the job of the SQL compiler will be minimal: just create the right type of join (adaptive or non-adaptive), where adaptive joins are allowed only in the toplevel circuit.

@mihaibudiu mihaibudiu marked this pull request as ready for review December 17, 2025 00:48
Copilot AI review requested due to automatic review settings December 17, 2025 00:48
@mihaibudiu mihaibudiu marked this pull request as draft December 17, 2025 00:48
@mihaibudiu
Copy link
Contributor

Sorry pressed wrong button

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements adaptive joins in DBSP, introducing a balancing system that dynamically selects optimal partitioning policies for join operations based on runtime characteristics like data skew and size. The implementation includes a MaxSAT solver for policy selection, metadata exchange between workers, and comprehensive test coverage.

Key Changes:

  • Introduces a Balancer component that manages partitioning policies for joins across the circuit
  • Adds accumulate_trace_balanced operation that combines exchange, accumulation, and integration with dynamic rebalancing
  • Implements MaxSAT solver for selecting optimal balancing policies under hard constraints

Reviewed changes

Copilot reviewed 66 out of 69 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
crates/dbsp/src/utils/graph.rs New graph utilities for computing connected components
crates/dbsp/src/operator/dynamic/balance/*.rs Core balancing infrastructure including MaxSAT solver and balancer logic
crates/dbsp/src/operator/dynamic/join.rs Added balanced join variants
crates/dbsp/src/operator/dynamic/outer_join.rs Added balanced left join variants
crates/dbsp/src/circuit/circuit_builder.rs Circuit API extensions for balancer integration
crates/dbsp/src/circuit/runtime.rs Added metadata broadcast mechanism
crates/dbsp/src/typed_batch.rs Moved iter() method to trait for broader accessibility
Comments suppressed due to low confidence (3)

crates/dbsp/src/operator/dynamic/balance/test.rs:1

  • The first argument should be false to match the function name 'no_checkpoints'. Currently passing true for checkpoints parameter contradicts the test name.
    crates/dbsp/src/operator/dynamic/balance/test.rs:1
  • The first argument should be false to match the function name 'no_checkpoints'. Currently passing true for checkpoints parameter contradicts the test name.
    crates/dbsp/src/operator/dynamic/balance/test.rs:1
  • The second argument should be true to match the function calling a left join test, and the test expects checkpoints. The arguments appear to be incorrect.

}
}

// TODO: use a better type thank serde_json::Value.
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'thank' to 'than'

Suggested change
// TODO: use a better type thank serde_json::Value.
// TODO: use a better type than serde_json::Value.

Copilot uses AI. Check for mistakes.
@Karakatiza666
Copy link
Contributor

  • There was an issue where pinned "top nodes" tooltip would not update when choosing another metric. I fixed it
  • Fixed an issue where the tooltip took up the entire height of the diagram regardless of content
  • Made table headers and category headers in the tooltip table sticky in web-console
  • Made sure the fixes to "top nodes" button work in profiler-app

ryzhyk and others added 3 commits December 29, 2025 12:59
We had two functions that initialized logging for testing purposes in the tree.
We unify them under `utils`, so we can use logging in new tests as well.

Signed-off-by: Leonid Ryzhyk <[email protected]>
Fixes #4632.

We implement the design for adaptive joins outlines in #4632.

One big caveat is that since multiple join operators can share an input stream,
choosing an optimal partitioning policy becomes an optimization problem and
requires runtime coordination across many join operators.

This commit introduces two new building blocks:

* Balancer: collects statistics across all join operators and all workers in the circuit and
  determines an optimal partitioning policy at each step.
* RebalancingExchangeSender - a version of the ExchangeSender operator that
  can reconfigure its partitioning policy on demand.

Adaptive joins are currently disabled by default and can be enabled by a new dev tweak,
until we're confident they are stable and do not make things worse for the vast majority
of workloads while significantly improving skewed workloads.

Signed-off-by: Leonid Ryzhyk <[email protected]>
@ryzhyk ryzhyk marked this pull request as ready for review December 29, 2025 22:37
@ryzhyk ryzhyk enabled auto-merge December 29, 2025 22:38
@ryzhyk ryzhyk added this pull request to the merge queue Dec 29, 2025
Merged via the queue into main with commit 922417b Dec 29, 2025
1 check passed
@ryzhyk ryzhyk deleted the issue4632 branch December 29, 2025 23:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Adaptive joins

4 participants