Skip to content

Conversation

@blp
Copy link
Member

@blp blp commented Dec 31, 2025

No description provided.

@blp blp self-assigned this Dec 31, 2025
Copilot AI review requested due to automatic review settings December 31, 2025 22:53
@blp blp added DBSP core Related to the core DBSP library ft Fault tolerant, distributed, and scale-out implementation connectors Issues related to the adapters/connectors crate labels Dec 31, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces initial support for multihost pipelines, enabling Feldera to distribute pipeline execution across multiple hosts. The implementation adds coordination mechanisms for managing distributed pipeline processes, including step synchronization, checkpoint coordination, and transaction management across hosts.

Key changes:

  • Added MultihostConfig and coordination API endpoints for multihost pipeline management
  • Implemented coordinator interfaces for controlling distributed pipeline execution
  • Extended runtime status enums with new Coordination state for pipeline initialization in multihost mode
  • Added infrastructure for inter-host communication and layout configuration

Reviewed changes

Copilot reviewed 39 out of 42 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
crates/feldera-types/src/coordination.rs New module defining coordinator-pipeline interface for multihost operations
crates/feldera-types/src/config.rs Added MultihostConfig struct and multihost field to PipelineConfig
crates/feldera-types/src/runtime_status.rs Added Coordination status variant and snake_case serialization support
crates/adapters/src/server.rs Implemented coordination API endpoints and multihost initialization logic
crates/adapters/src/controller.rs Extended controller with coordination request handling and checkpoint preparation
crates/dbsp/src/circuit/dbsp_handle.rs Modified fingerprint handling and layout support for multihost circuits
crates/dbsp/src/operator/dynamic/communication/shard.rs Added worker range-based sharding for multihost data distribution
crates/pipeline-manager/src/runner/local_runner.rs Added validation to reject multihost deployments in local runner
crates/pipeline-manager/src/db/types/program.rs Integrated multihost configuration generation based on runtime config
Cargo.toml Updated dependencies including tarpc version bump for RPC communication
Comments suppressed due to low confidence (3)

crates/pipeline-manager/src/db/types/combined_status.rs:1

  • The todo!() placeholder for RuntimeStatus::Coordination will panic at runtime. This needs to be implemented with the appropriate CombinedStatus variant or mapping logic before this code path is exercised.
    crates/pipeline-manager/src/db/types/combined_status.rs:1
  • The todo!() placeholder for RuntimeDesiredStatus::Coordination will panic at runtime. This needs to be implemented with the appropriate CombinedDesiredStatus variant or mapping logic before this code path is exercised.
    crates/pipeline-manager/src/db/types/combined_status.rs:1
  • The todo!() placeholder for RuntimeDesiredStatus::Coordination will panic at runtime. This needs to be implemented with the appropriate CombinedDesiredStatus variant or mapping logic before this code path is exercised.

blp added 15 commits January 2, 2026 10:24
The actix_web #[get] macro does something weird with function names that
makes it hard to use the same name pretty much anywhere in the same scope,
even as local variables.  I kept trying to use `status` and it was not
working out well.

Signed-off-by: Ben Pfaff <[email protected]>
This allows the runner to report errors from OpenSSL.

Signed-off-by: Ben Pfaff <[email protected]>
It's easier for me to think about atomics than about mutexes because
there is no way for anything to happen while they are "held", so no
possibility of contributing to a deadlock, etc.

Signed-off-by: Ben Pfaff <[email protected]>
This ensures that the checkpoint can always be read from the checkpoint
directory before trying to make it the default checkpoint.

Signed-off-by: Ben Pfaff <[email protected]>
These two steps were muddled together, which was more or less OK for the
existing users, but an upcoming user will want to separate them.

Signed-off-by: Ben Pfaff <[email protected]>
This commit fixes up the breaking changes between these two releases, which
are really quite desirable ones because tarpc benefits a lot from async
functions in traits.

The reason for this upgrade is allow passing in a TCP socket instead of
having tarpc listen, which is a feature added in tarpc 0.35:
https://github.com/google/tarpc/blob/main/RELEASES.md#new-features

Signed-off-by: Ben Pfaff <[email protected]>
This switches to a simpler way that will be easier to extend for multihost
coordination.

Signed-off-by: Ben Pfaff <[email protected]>
…rkers.

Our output connectors run on a single host within the collection of hosts
in a multihost pipeline.  On such a pipeline, we want to shard to those
workers.

(Gathering to a single worker on the output connector's host might work
just as well, though.)

Signed-off-by: Ben Pfaff <[email protected]>
…rsion.

With this change, it becomes possible to deserialize
ExternalGlobalControllerMetrics as well as to serialize it.  It wasn't
possible before because InputEndpointConfig can't be deserialized from
ShortEndpointConfig (because the latter doesn't contain the configuration,
only the stream name).

This change drops a lot from GlobalControllerMetrics, since it no longer
needs to support serialization or deserialization.

Signed-off-by: Ben Pfaff <[email protected]>
@blp blp force-pushed the coordinator branch 4 times, most recently from 810e755 to 645cdd1 Compare January 2, 2026 20:23
Copy link
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've only reviewed the preliminary commits so far. Starting on the main commit now.

self,
)
#[track_caller]
fn dyn_gather_multihost(&self, factories: &B::Factories, receiver_worker: usize) -> Stream<C, B>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason dyn_gather_multihost wouldn't work in the single-host case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work there too, it's just more expensive.

@ryzhyk pointed out that this avoids a potential pitfall.

Signed-off-by: Ben Pfaff <[email protected]>
Copy link
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't read coordinator code yet, but this looks extremely clean!

pub enum StepAction {
/// Wait for instructions from the coordinator.
Idle,
/// Wait for a triggering event to occur, such as arrival of a sufficient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was coordinator's job to decide when all conditions for a step are met.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is that it's a waste of a round trip for the pipeline to report that it's got data and then for the coordinator to tell it to run a step. Instead, the coordinator says to start a step if data shows up.

///
/// The worker threads are evenly divided among the hosts. For single-host
/// deployments, this should be 1 (the default).
pub hosts: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this here given that it's already in PipelineConfig

/// Used to notify watchers when a step has been completed.
///
/// This is updated to match `step` whenever it changes.
step_sender: tokio::sync::watch::Sender<StepStatus>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: import Sender instead of using a fully qualified path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've got std::sync::mpsc::Sender imported already. I could import this as a partly qualified path as watch::Sender, if you like.

/// - Whether the pipeline is currently `running`.
/// - Whether a checkpoint has already been requested.
/// - The current [CoordinationRequest] and the current step.
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the current step the next step the pipeline hasn't taken yet?

});
Ok(
HttpResponseBuilder::new(StatusCode::OK).streaming(stream.map(|value| {
Ok::<_, Infallible>(Bytes::from(serde_json::to_string(&value).unwrap() + "\n"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope the HTTP connection doesn't timeout if there is no status change for a few seconds

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem to. I think that reqwest shares connections across requests. At any rate, the coordinator reconnects if anything drops, so it's not a big deal if it does.

blp added 4 commits January 8, 2026 10:34
The cast wasn't necessary in AdHocQueryExecution::children().

The clone written inline in AdHocQueryExecution::with_new_children() can
be more easily written as a real clone.

Signed-off-by: Ben Pfaff <[email protected]>
Until now, each table scan during an ad-hoc query separately locked the
table of snapshots.  The table of snapshots could change from one scan to
the next.  This meant that ad-hoc queries that involved multiple tables,
or that scanned a single table multiple times, could work with
inconsistent data.  This fixes the problem.

Signed-off-by: Ben Pfaff <[email protected]>
This enables a multihost coordinator to get a lease on a particular step
across all of the hosts, then scan the tables in that scan, and finally
drop the lease.

Signed-off-by: Ben Pfaff <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate DBSP core Related to the core DBSP library ft Fault tolerant, distributed, and scale-out implementation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants