-
Notifications
You must be signed in to change notification settings - Fork 92
Initial support for multihost pipelines #5358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces initial support for multihost pipelines, enabling Feldera to distribute pipeline execution across multiple hosts. The implementation adds coordination mechanisms for managing distributed pipeline processes, including step synchronization, checkpoint coordination, and transaction management across hosts.
Key changes:
- Added
MultihostConfigand coordination API endpoints for multihost pipeline management - Implemented coordinator interfaces for controlling distributed pipeline execution
- Extended runtime status enums with new
Coordinationstate for pipeline initialization in multihost mode - Added infrastructure for inter-host communication and layout configuration
Reviewed changes
Copilot reviewed 39 out of 42 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/feldera-types/src/coordination.rs | New module defining coordinator-pipeline interface for multihost operations |
| crates/feldera-types/src/config.rs | Added MultihostConfig struct and multihost field to PipelineConfig |
| crates/feldera-types/src/runtime_status.rs | Added Coordination status variant and snake_case serialization support |
| crates/adapters/src/server.rs | Implemented coordination API endpoints and multihost initialization logic |
| crates/adapters/src/controller.rs | Extended controller with coordination request handling and checkpoint preparation |
| crates/dbsp/src/circuit/dbsp_handle.rs | Modified fingerprint handling and layout support for multihost circuits |
| crates/dbsp/src/operator/dynamic/communication/shard.rs | Added worker range-based sharding for multihost data distribution |
| crates/pipeline-manager/src/runner/local_runner.rs | Added validation to reject multihost deployments in local runner |
| crates/pipeline-manager/src/db/types/program.rs | Integrated multihost configuration generation based on runtime config |
| Cargo.toml | Updated dependencies including tarpc version bump for RPC communication |
Comments suppressed due to low confidence (3)
crates/pipeline-manager/src/db/types/combined_status.rs:1
- The
todo!()placeholder forRuntimeStatus::Coordinationwill panic at runtime. This needs to be implemented with the appropriateCombinedStatusvariant or mapping logic before this code path is exercised.
crates/pipeline-manager/src/db/types/combined_status.rs:1 - The
todo!()placeholder forRuntimeDesiredStatus::Coordinationwill panic at runtime. This needs to be implemented with the appropriateCombinedDesiredStatusvariant or mapping logic before this code path is exercised.
crates/pipeline-manager/src/db/types/combined_status.rs:1 - The
todo!()placeholder forRuntimeDesiredStatus::Coordinationwill panic at runtime. This needs to be implemented with the appropriateCombinedDesiredStatusvariant or mapping logic before this code path is exercised.
bba6ac5 to
2832f45
Compare
Signed-off-by: Ben Pfaff <[email protected]>
The actix_web #[get] macro does something weird with function names that makes it hard to use the same name pretty much anywhere in the same scope, even as local variables. I kept trying to use `status` and it was not working out well. Signed-off-by: Ben Pfaff <[email protected]>
This allows the runner to report errors from OpenSSL. Signed-off-by: Ben Pfaff <[email protected]>
It's easier for me to think about atomics than about mutexes because there is no way for anything to happen while they are "held", so no possibility of contributing to a deadlock, etc. Signed-off-by: Ben Pfaff <[email protected]>
Signed-off-by: Ben Pfaff <[email protected]>
Signed-off-by: Ben Pfaff <[email protected]>
This ensures that the checkpoint can always be read from the checkpoint directory before trying to make it the default checkpoint. Signed-off-by: Ben Pfaff <[email protected]>
These two steps were muddled together, which was more or less OK for the existing users, but an upcoming user will want to separate them. Signed-off-by: Ben Pfaff <[email protected]>
This commit fixes up the breaking changes between these two releases, which are really quite desirable ones because tarpc benefits a lot from async functions in traits. The reason for this upgrade is allow passing in a TCP socket instead of having tarpc listen, which is a feature added in tarpc 0.35: https://github.com/google/tarpc/blob/main/RELEASES.md#new-features Signed-off-by: Ben Pfaff <[email protected]>
This switches to a simpler way that will be easier to extend for multihost coordination. Signed-off-by: Ben Pfaff <[email protected]>
Fixes: #267 Signed-off-by: Ben Pfaff <[email protected]>
…rkers. Our output connectors run on a single host within the collection of hosts in a multihost pipeline. On such a pipeline, we want to shard to those workers. (Gathering to a single worker on the output connector's host might work just as well, though.) Signed-off-by: Ben Pfaff <[email protected]>
…rsion. With this change, it becomes possible to deserialize ExternalGlobalControllerMetrics as well as to serialize it. It wasn't possible before because InputEndpointConfig can't be deserialized from ShortEndpointConfig (because the latter doesn't contain the configuration, only the stream name). This change drops a lot from GlobalControllerMetrics, since it no longer needs to support serialization or deserialization. Signed-off-by: Ben Pfaff <[email protected]>
Signed-off-by: Ben Pfaff <[email protected]>
Signed-off-by: Ben Pfaff <[email protected]>
810e755 to
645cdd1
Compare
Signed-off-by: Ben Pfaff <[email protected]>
ryzhyk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've only reviewed the preliminary commits so far. Starting on the main commit now.
| self, | ||
| ) | ||
| #[track_caller] | ||
| fn dyn_gather_multihost(&self, factories: &B::Factories, receiver_worker: usize) -> Stream<C, B> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason dyn_gather_multihost wouldn't work in the single-host case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should work there too, it's just more expensive.
@ryzhyk pointed out that this avoids a potential pitfall. Signed-off-by: Ben Pfaff <[email protected]>
ryzhyk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't read coordinator code yet, but this looks extremely clean!
| pub enum StepAction { | ||
| /// Wait for instructions from the coordinator. | ||
| Idle, | ||
| /// Wait for a triggering event to occur, such as arrival of a sufficient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it was coordinator's job to decide when all conditions for a step are met.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is that it's a waste of a round trip for the pipeline to report that it's got data and then for the coordinator to tell it to run a step. Instead, the coordinator says to start a step if data shows up.
| /// | ||
| /// The worker threads are evenly divided among the hosts. For single-host | ||
| /// deployments, this should be 1 (the default). | ||
| pub hosts: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this here given that it's already in PipelineConfig
| /// Used to notify watchers when a step has been completed. | ||
| /// | ||
| /// This is updated to match `step` whenever it changes. | ||
| step_sender: tokio::sync::watch::Sender<StepStatus>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: import Sender instead of using a fully qualified path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've got std::sync::mpsc::Sender imported already. I could import this as a partly qualified path as watch::Sender, if you like.
| /// - Whether the pipeline is currently `running`. | ||
| /// - Whether a checkpoint has already been requested. | ||
| /// - The current [CoordinationRequest] and the current step. | ||
| /// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the current step the next step the pipeline hasn't taken yet?
| }); | ||
| Ok( | ||
| HttpResponseBuilder::new(StatusCode::OK).streaming(stream.map(|value| { | ||
| Ok::<_, Infallible>(Bytes::from(serde_json::to_string(&value).unwrap() + "\n")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope the HTTP connection doesn't timeout if there is no status change for a few seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem to. I think that reqwest shares connections across requests. At any rate, the coordinator reconnects if anything drops, so it's not a big deal if it does.
It was unused. Signed-off-by: Ben Pfaff <[email protected]>
The cast wasn't necessary in AdHocQueryExecution::children(). The clone written inline in AdHocQueryExecution::with_new_children() can be more easily written as a real clone. Signed-off-by: Ben Pfaff <[email protected]>
Until now, each table scan during an ad-hoc query separately locked the table of snapshots. The table of snapshots could change from one scan to the next. This meant that ad-hoc queries that involved multiple tables, or that scanned a single table multiple times, could work with inconsistent data. This fixes the problem. Signed-off-by: Ben Pfaff <[email protected]>
This enables a multihost coordinator to get a lease on a particular step across all of the hosts, then scan the tables in that scan, and finally drop the lease. Signed-off-by: Ben Pfaff <[email protected]>
No description provided.