-
Notifications
You must be signed in to change notification settings - Fork 92
[manager] Switch from awc to reqwest. #5021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This patch replaces `awc` with `reqwest` as the HTTP client for talking to the pipeline. The main reason for this are various hard-to-explain HTTP errors we've observed in CI. On top of this, awc doesn't support a clean way to disable connection caching. It can only be achieved by setting the `connection: Close` header, which we suspect confuses the server. In contrast, reqwest supports setting connection pool size to 0. This commit was mostly generated by cursor. There are a couple of subtle points here (which cursor missed) - In awc, ClientRequest::timeout() controls timeout from request start till HTTP response header is returned. reqwest doesn't have a similar method, so we use tokio::timeout instead. - reqwest seems to handle disconnects differently from awc: in awc when the pipeline drops its end of a streaming response, the byte stream simply ends (without an error); in reqwest, the stream contains an error, which gets propagated to the API client. This may be the right thing to do, but currently the Python SDK doesn't expect that. We simulate the old behavior to avoid a breaking change. Signed-off-by: Leonid Ryzhyk <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR replaces the awc HTTP client with reqwest for pipeline communication to address HTTP errors observed in CI and improve connection management. The main change is replacing awc::Client with reqwest::Client throughout the codebase, with special handling for connection pooling and streaming response behavior.
Key Changes
- Replaced
awc::Clientwithreqwest::Clientfor all HTTP requests to pipelines - Disabled connection pooling by setting
pool_max_idle_per_host(0)to prevent stale connection errors - Modified streaming response error handling to gracefully terminate on disconnects instead of propagating errors to maintain backward compatibility with the Python SDK
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
crates/pipeline-manager/src/runner/interaction.rs |
Core HTTP client replacement with new request/response handling and streaming error suppression |
crates/pipeline-manager/src/config.rs |
Added connection pool configuration to disable idle connection caching |
crates/pipeline-manager/src/api/support_data_collector.rs |
Updated client type references and streaming response handling |
crates/pipeline-manager/src/api/main.rs |
Modified server initialization to use both reqwest and awc clients (awc for WebSocket) |
crates/pipeline-manager/src/api/endpoints/pipeline_management.rs |
Updated function signatures to accept reqwest::Client |
crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs |
Updated function signature to accept reqwest::Client |
crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs |
Updated function signatures and added both client types for WebSocket vs HTTP handling |
crates/pipeline-manager/src/api/endpoints/metrics.rs |
Updated function signature to accept reqwest::Client |
crates/pipeline-manager/src/api/endpoints/config.rs |
Updated function signature to accept reqwest::Client |
Comments suppressed due to low confidence (1)
crates/pipeline-manager/src/runner/interaction.rs:1
- The comment is incomplete. It appears to be cut off mid-sentence ('Other').
use crate::api::error::ApiError;
| Method::PATCH => reqwest::Method::PATCH, | ||
| Method::OPTIONS => reqwest::Method::OPTIONS, | ||
| Method::HEAD => reqwest::Method::HEAD, | ||
| _ => reqwest::Method::GET, |
Copilot
AI
Nov 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default fallback to GET for unrecognized HTTP methods could mask errors. Consider returning an error or using a more explicit match pattern to handle unsupported methods explicitly.
| _ => reqwest::Method::GET, | |
| _ => { | |
| return Err(RunnerError::PipelineInteractionInvalidRequest { | |
| pipeline_name: pipeline_name.to_string(), | |
| error: format!("unsupported HTTP method: {}", method), | |
| }.into()); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which other methods cause this to not be exhaustive?
| Method::PATCH => reqwest::Method::PATCH, | ||
| Method::OPTIONS => reqwest::Method::OPTIONS, | ||
| Method::HEAD => reqwest::Method::HEAD, | ||
| _ => reqwest::Method::GET, |
Copilot
AI
Nov 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default fallback to GET for unrecognized HTTP methods could mask errors. Consider returning an error or using a more explicit match pattern to handle unsupported methods explicitly.
| _ => reqwest::Method::GET, | |
| _ => { | |
| return Err(ManagerError::UnsupportedMethod(request.method().to_string())); | |
| } |
| // let request_str_clone = request_str.clone(); | ||
|
|
||
| // Convert reqwest streaming response to actix streaming response | ||
| // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed | ||
| // Handle errors gracefully to avoid "response ended prematurely" on client disconnects | ||
| let stream = response | ||
| .bytes_stream() | ||
| // .inspect(move |result| { | ||
| // match result { | ||
| // Ok(bytes) => println!("Stream {request_str_clone} chunk: {} bytes - {:?}", bytes.len(), bytes), | ||
| // Err(e) => println!("Stream {request_str_clone} error: {:?}", e), | ||
| // } | ||
| // }) | ||
|
|
Copilot
AI
Nov 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented-out debugging code before merging. The commented inspection logic and unused request_str_clone should be cleaned up.
| // let request_str_clone = request_str.clone(); | |
| // Convert reqwest streaming response to actix streaming response | |
| // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed | |
| // Handle errors gracefully to avoid "response ended prematurely" on client disconnects | |
| let stream = response | |
| .bytes_stream() | |
| // .inspect(move |result| { | |
| // match result { | |
| // Ok(bytes) => println!("Stream {request_str_clone} chunk: {} bytes - {:?}", bytes.len(), bytes), | |
| // Err(e) => println!("Stream {request_str_clone} error: {:?}", e), | |
| // } | |
| // }) | |
| // Convert reqwest streaming response to actix streaming response | |
| // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed | |
| // Handle errors gracefully to avoid "response ended prematurely" on client disconnects | |
| let stream = response | |
| .bytes_stream() |
|
In general, I'm for this change, because the fact that the |
This patch replaces
awcwithreqwestas the HTTP client for talking to the pipeline. The main reason for this are various hard-to-explain HTTP errors we've observed in CI. On top of this, awc doesn't support a clean way to disable connection caching. It can only be achieved by setting theconnection: Closeheader, which we suspect confuses the server. In contrast, reqwest supports setting connection pool size to 0.This commit was mostly generated by cursor.
There are a couple of subtle points here (which cursor missed)
In awc, ClientRequest::timeout() controls timeout from request start till HTTP response header is returned. reqwest doesn't have a similar method, so we use tokio::timeout instead.
reqwest seems to handle disconnects differently from awc: in awc when the pipeline drops its end of a streaming response, the byte stream simply ends (without an error); in reqwest, the stream contains an error, which gets propagated to the API client. This may be the right thing to do, but currently the Python SDK doesn't expect that. We simulate the old behavior to avoid a breaking change.