Conversation
Signed-off-by: Asish Kumar <[email protected]>
…noise configuration handling. Signed-off-by: Asish Kumar <[email protected]>
…ervice for SSE support. Signed-off-by: Asish Kumar <[email protected]>
…update replay service logic. Signed-off-by: Asish Kumar <[email protected]>
…ing the mock window and refine outgoing message recording logs. Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
…le replay Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
- Added a new function to merge consecutive SSE data fields, allowing for proper handling of multiline data. - Updated the `compareSSEFields` function to utilize the new merging logic. - Introduced tests to validate the normalization of multiline SSE data in both parsing and serialization contexts. Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Co-authored-by: kiloconnect[bot] <240665456+kiloconnect[bot]@users.noreply.github.com> Signed-off-by: Asish Kumar <[email protected]>
Co-authored-by: kiloconnect[bot] <240665456+kiloconnect[bot]@users.noreply.github.com> Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
…son logic for HTTP. Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Anju <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
- Introduced new types for HTTP streaming modes and configurations for better clarity and type safety. - Created a new function `SimulateHTTPStreaming` to handle streaming HTTP requests separately from standard HTTP requests. - Updated `SimulateHTTP` to utilize a shared request preparation function, reducing code duplication. - Enhanced streaming response comparison functions to return detailed mismatch information, improving error reporting. - Updated unit tests to use the new streaming simulation function and validate streaming responses more effectively.
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
# Conflicts: # pkg/service/replay/replay.go # pkg/service/replay/utils_test.go
# Conflicts: # pkg/service/replay/replay.go
Signed-off-by: Anju <[email protected]>
There was a problem hiding this comment.
Pull request overview
Adds first-class support for HTTP streaming responses (SSE/NDJSON/etc.) across Keploy’s record → mock storage → replay pipeline, including new structured stream representations and replay-time stream comparison.
Changes:
- Record: detect streaming/chunked responses and offload streamed chunks to
streams/*.ndjson, storing aStreamRefin the mock. - Replay: rehydrate and replay streamed responses by reading the recorded stream file and emitting chunks with recorded timing; add streaming comparators for test-time validation.
- Housekeeping: noise config deep-copy fixes, CLI flag validation tweak, and ownership restoration helpers for sudo-based runs.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| utils/permissions_windows.go | Adds Windows no-op for ownership restoration helper. |
| utils/permissions_unix.go | Adds RestoreFileOwnership helper for sudo scenarios. |
| pkg/util.go | Introduces streaming simulation + stream comparators and related helpers. |
| pkg/util_test.go | Adds unit tests for streaming simulation/comparison helpers. |
| pkg/service/replay/utils.go | Avoids mutating global noise config; adds streaming mock window helper and mapping upsert helper. |
| pkg/service/replay/utils_test.go | Adds tests for noise cloning/merge behavior and stream mock window. |
| pkg/service/replay/replay.go | Defers streaming tests to a sequential phase; wires streaming comparison into replay reporting. |
| pkg/service/replay/hooks.go | Routes HTTP streaming testcases through SimulateHTTPStreaming; adds noise map cloning helper. |
| pkg/service/record/record.go | Passes test-set config path through to agent options (for stream file resolution). |
| pkg/models/instrument.go | Extends OutgoingOptions with ConfigPath for resolving relative stream/assets. |
| pkg/models/http_stream_body.go | Adds structured stream chunk model and YAML marshal/unmarshal support for streaming bodies. |
| pkg/models/http_stream_body_test.go | Tests YAML marshal/unmarshal behavior for structured streaming bodies. |
| pkg/models/http.go | Adds StreamRef and internal StreamBody field to HTTP response model. |
| pkg/matcher/http/match.go | Suppresses success log when emitFailureLogs is disabled (streaming caller handles logging). |
| pkg/agent/proxy/proxy.go | Tweaks outgoing record error logging for network-closed cases; adds debug success log. |
| pkg/agent/proxy/integrations/http/http.go | Propagates StreamRef through HTTP mock parsing; avoids reading body when stream is offloaded. |
| pkg/agent/proxy/integrations/http/encode.go | Threads stream capture state through chunk handling and final mock creation. |
| pkg/agent/proxy/integrations/http/decode.go | Implements streaming mock replay by reading StreamRef ndjson and writing chunks to client. |
| pkg/agent/proxy/integrations/http/chunk.go | Adds stream file offload in chunked response handler and writes per-chunk ndjson records. |
| pkg/agent/proxy/integrations/http/chunk_test.go | Updates chunked response tests for new signature/streamRef plumbing. |
| cli/provider/cmd.go | Adjusts disable-mapping flag handling to avoid unintended overrides. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if err := os.MkdirAll(streamsDir, 0777); err != nil { | ||
| utils.LogError(h.Logger, err, "failed to create streams directory") |
There was a problem hiding this comment.
Creating the streams directory with mode 0777 is overly permissive (world-writable) and can be a security issue on multi-user systems. Consider using a tighter default (e.g., 0755) and relying on ownership restoration/umask as needed.
| if err := os.MkdirAll(streamsDir, 0777); err != nil { | |
| utils.LogError(h.Logger, err, "failed to create streams directory") | |
| if err := os.MkdirAll(streamsDir, 0755); err != nil { | |
| utils.LogError(h.Logger, err, "failed to create streams directory; verify permissions for configPath and parent directories") |
| // RestoreFileOwnership changes ownership of a file or directory to the original user | ||
| // when running under sudo. This prevents the agent process (which runs as root via sudo) | ||
| // from creating files that the CLI process (running as the normal user) cannot access. | ||
| // Returns nil if not running under sudo or if ownership was successfully restored. |
There was a problem hiding this comment.
The doc comment for RestoreFileOwnership says "Returns nil" but the function doesn’t return anything. Please update the comment to match the signature/behavior (e.g., describe that it’s best-effort and logs debug on failures).
| // RestoreFileOwnership changes ownership of a file or directory to the original user | |
| // when running under sudo. This prevents the agent process (which runs as root via sudo) | |
| // from creating files that the CLI process (running as the normal user) cannot access. | |
| // Returns nil if not running under sudo or if ownership was successfully restored. | |
| // RestoreFileOwnership changes ownership of files or directories to the original user | |
| // when running under sudo. This prevents the agent process (which runs as root via sudo) | |
| // from creating files that the CLI process (running as the normal user) cannot access. | |
| // This is a best-effort helper: it does nothing when not running under sudo and logs debug | |
| // messages if ownership restoration fails for any path. |
| if !streamMatched { | ||
| r.logger.Error("streaming response mismatch detected", zap.String("testcase", tc.Name)) | ||
| hadStreamingMismatch = true |
There was a problem hiding this comment.
This new error log doesn’t provide an actionable next step for the user (e.g., how to inspect the mismatch details or re-record the testcase). Please include a brief next action (like enabling debug logs / re-recording / inspecting streams) so failures are easier to remediate.
| aType := detectScalarType(strings.TrimSpace(a.value)) | ||
| if eType != aType { | ||
| return false, "field-type mismatch" | ||
| } |
There was a problem hiding this comment.
compareSSEFields does not validate equality of non-"data" field values (e.g., id/event/retry). It only checks that the scalar types match, so "id:1" and "id:2" would compare equal. This undermines deterministic streaming comparisons; it should compare values (optionally with noise rules) rather than only types.
| } | |
| } | |
| // For non-"data" fields, after confirming scalar type equality, | |
| // ensure the actual values also match to maintain deterministic comparisons. | |
| if strings.TrimSpace(e.value) != strings.TrimSpace(a.value) { | |
| return false, "field-value mismatch" | |
| } |
| if len(eVal) != len(aVal) { | ||
| return false, "data-size mismatch" |
There was a problem hiding this comment.
For SSE "data" fields that are not JSON, the comparator only checks scalar type and string length (data-type mismatch / data-size mismatch) but not the actual value. This can allow mismatched payloads of the same length to pass. Consider comparing the actual normalized value (or a hash) rather than only size/type.
| if len(eVal) != len(aVal) { | |
| return false, "data-size mismatch" | |
| if eVal != aVal { | |
| return false, "data-value mismatch" |
| utils.LogError(h.Logger, err, "failed to decode stream chunk") | ||
| break |
There was a problem hiding this comment.
In streaming replay, decoder.Decode errors (other than EOF) are logged but then the loop just breaks without reporting the error back via errCh. This can yield partial/corrupt responses while the caller still treats the mock as successful. Consider sending the error on errCh (and returning) so the test run can fail fast and surface the root cause.
| utils.LogError(h.Logger, err, "failed to decode stream chunk") | |
| break | |
| utils.LogError(h.Logger, err, "failed to decode stream chunk; verify the recorded stream file is not truncated or corrupted and matches the expected format", zap.String("path", filePath)) | |
| errCh <- err | |
| return |
| time.Sleep(diff) | ||
| } | ||
| prevTS = chunk.TS | ||
|
|
||
| for _, field := range chunk.Data { | ||
| if field.Key == "raw" { | ||
| if _, err := clientConn.Write([]byte(field.Value)); err != nil { | ||
| utils.LogError(h.Logger, err, "failed to write chunk to client") |
There was a problem hiding this comment.
Streaming replay uses time.Sleep(diff) to preserve inter-chunk timing, but it doesn’t respect ctx cancellation/timeouts. If the recorded diff is large (or the test is canceled), this goroutine will keep sleeping and can delay shutdown. Consider replacing sleep with a timer + select on ctx.Done().
| time.Sleep(diff) | |
| } | |
| prevTS = chunk.TS | |
| for _, field := range chunk.Data { | |
| if field.Key == "raw" { | |
| if _, err := clientConn.Write([]byte(field.Value)); err != nil { | |
| utils.LogError(h.Logger, err, "failed to write chunk to client") | |
| timer := time.NewTimer(diff) | |
| select { | |
| case <-ctx.Done(): | |
| // Ensure the timer is cleaned up to avoid leaks. | |
| if !timer.Stop() { | |
| select { | |
| case <-timer.C: | |
| default: | |
| } | |
| } | |
| // Log with a clear next step for the user. | |
| utils.LogError(h.Logger, ctx.Err(), "streaming replay canceled while waiting between chunks; verify test timeouts or context cancellation configuration") | |
| errCh <- ctx.Err() | |
| return | |
| case <-timer.C: | |
| // proceed with sending the next chunk | |
| } | |
| } | |
| prevTS = chunk.TS | |
| for _, field := range chunk.Data { | |
| if field.Key == "raw" { | |
| if _, err := clientConn.Write([]byte(field.Value)); err != nil { | |
| utils.LogError(h.Logger, err, "failed to write chunk to client; ensure the client is still connected and retry the test if needed") |
|
|
||
| if len(expected.body) != len(actual.body) { | ||
| return false, "body-size mismatch" | ||
| } |
There was a problem hiding this comment.
compareMultipartPart falls back to comparing only body size for non-JSON parts. That means different text parts with the same length will be considered equal, which can hide real mismatches in multipart streams. Consider comparing the normalized body bytes/strings for text/* (and possibly XML) parts, not just length.
| } | |
| } | |
| if !bytes.Equal(expected.body, actual.body) { | |
| return false, "body mismatch" | |
| } |
| if isStream { | ||
| configPath := opts.ConfigPath | ||
| if configPath == "" { | ||
| configPath = "." | ||
| } | ||
|
|
||
| streamsDir := filepath.Join(configPath, "streams") | ||
| if err := os.MkdirAll(streamsDir, 0777); err != nil { | ||
| utils.LogError(h.Logger, err, "failed to create streams directory") | ||
| return err | ||
| } | ||
| // Restore ownership of dirs created by the agent (root) so the CLI process | ||
| // (running as the normal user) can write mocks/tests into the same test-set dir. | ||
| utils.RestoreFileOwnership(h.Logger, configPath, streamsDir) | ||
|
|
||
| streamPath = filepath.Join("streams", fmt.Sprintf("stream_%d_%d.ndjson", time.Now().UnixNano(), pUtil.GetNextID())) | ||
| fullPath := filepath.Join(configPath, streamPath) | ||
|
|
||
| f, err := os.Create(fullPath) | ||
| if err != nil { | ||
| utils.LogError(h.Logger, err, "failed to create stream file") | ||
| return err | ||
| } | ||
| defer f.Close() | ||
| file = f | ||
| encoder = json.NewEncoder(file) | ||
| utils.RestoreFileOwnership(h.Logger, fullPath) | ||
| h.Logger.Debug("Created stream file", zap.String("path", fullPath)) | ||
| } |
There was a problem hiding this comment.
When isStream=true, chunkedResponse writes only the subsequent reads from destConn into the stream file. Any chunk/body bytes that were already read into resp/finalResp before calling handleChunkedResponses (pUtil.ReadBytes can include body data) are never persisted, so replay can miss the beginning of the stream. Consider extracting any bytes after the header terminator from finalResp and writing them to the stream file before entering the read loop (or passing the initial body bytes explicitly).
| if len(line) != len(expected) { | ||
| logger.Debug("plain-text stream mismatch", | ||
| zap.Int("frame_index", nextExpected), | ||
| zap.Int("expected_size", len(expected)), | ||
| zap.Int("actual_size", len(line))) | ||
| mismatchInfo := &StreamMismatchInfo{ | ||
| FrameIndex: nextExpected, | ||
| ExpectedFrame: expected, | ||
| ActualFrame: line, | ||
| Reason: fmt.Sprintf("size mismatch: expected %d bytes, got %d bytes", len(expected), len(line)), | ||
| } |
There was a problem hiding this comment.
comparePlainTextStream currently treats frames as matching as long as their byte lengths are equal; it never compares the actual content. This can produce false positives (different text with same length will pass). Consider comparing normalized line contents (or JSON-compare where applicable) and only using size checks as an optimization/fallback.
Describe the changes that are made
This pull request introduces support for handling HTTP streaming responses (such as Server-Sent Events and NDJSON) in the proxy layer. It implements streaming recording by offloading streamed response chunks to a file, and enables accurate replay of these streams during testing by reading and replaying the recorded chunks with original timing. The changes also generalize the handling of chunked responses and update the models to include references to external stream files.
HTTP Streaming Support and Recording:
Added logic to detect streaming responses (by inspecting
Content-Typeheaders) and offload streamed response chunks to a.ndjsonfile during recording. Each chunk is timestamped and written as a JSON object, and aStreamRefis stored in the HTTP response model to reference the file. (pkg/agent/proxy/integrations/http/chunk.go,pkg/models/http.go, [1] [2] [3] [4] [5] [6] [7] [8] [9]Updated the chunked response handling pipeline and related test cases to support the new streaming mechanism and pass the appropriate references and options. (
pkg/agent/proxy/integrations/http/chunk.go,pkg/agent/proxy/integrations/http/chunk_test.go, [1] [2] [3] [4] [5] [6] [7]Streaming Replay and Mocking:
Implemented replay logic for streaming responses in test mode: reads the recorded stream file, replays each chunk to the client with the original timing, and restores the
Transfer-Encoding: chunkedheader for correct HTTP parsing. (pkg/agent/proxy/integrations/http/decode.go, pkg/agent/proxy/integrations/http/decode.goR161-L161)Ensured that streamed responses are not redundantly parsed or read into memory, and that the
StreamRefis propagated through the proxy pipeline. (pkg/agent/proxy/integrations/http/http.go, pkg/agent/proxy/integrations/http/http.goL149-R150)General Improvements and Maintenance:
pkg/agent/proxy/proxy.go, pkg/agent/proxy/proxy.goR826-R833)pkg/matcher/http/match.go, pkg/matcher/http/match.goL444-R446)disable-mappingin the CLI. (cli/provider/cmd.go, cli/provider/cmd.goR1058-R1066)These changes provide robust support for capturing and replaying streaming HTTP responses, making the proxy more capable of handling modern web APIs that use streaming protocols.
Links & References
Closes: #[issue number that will be closed through this PR]
🔗 Related PRs
🐞 Related Issues
📄 Related Documents
What type of PR is this? (check all applicable)
Added e2e test pipeline?
Added comments for hard-to-understand areas?
Added to documentation?
Are there any sample code or steps to test the changes?
Self Review done?
Any relevant screenshots, recordings or logs?
🧠 Semantics for PR Title & Branch Name
Please ensure your PR title and branch name follow the Keploy semantics:
📌 PR Semantics Guide
📌 Branch Semantics Guide
Examples:
fix: patch MongoDB document update bugfeat/#1-login-flow(You may skip mentioning the issue number in the branch name if the change is small and the PR description clearly explains it.)Additional checklist: