Skip to content

Feat/sse outgoing#3961

Open
anjupathak03 wants to merge 67 commits intomainfrom
feat/sse-outgoing
Open

Feat/sse outgoing#3961
anjupathak03 wants to merge 67 commits intomainfrom
feat/sse-outgoing

Conversation

@anjupathak03
Copy link
Copy Markdown
Contributor

Describe the changes that are made

This pull request introduces support for handling HTTP streaming responses (such as Server-Sent Events and NDJSON) in the proxy layer. It implements streaming recording by offloading streamed response chunks to a file, and enables accurate replay of these streams during testing by reading and replaying the recorded chunks with original timing. The changes also generalize the handling of chunked responses and update the models to include references to external stream files.

HTTP Streaming Support and Recording:

  • Added logic to detect streaming responses (by inspecting Content-Type headers) and offload streamed response chunks to a .ndjson file during recording. Each chunk is timestamped and written as a JSON object, and a StreamRef is stored in the HTTP response model to reference the file. (pkg/agent/proxy/integrations/http/chunk.go, pkg/models/http.go, [1] [2] [3] [4] [5] [6] [7] [8] [9]

  • Updated the chunked response handling pipeline and related test cases to support the new streaming mechanism and pass the appropriate references and options. (pkg/agent/proxy/integrations/http/chunk.go, pkg/agent/proxy/integrations/http/chunk_test.go, [1] [2] [3] [4] [5] [6] [7]

Streaming Replay and Mocking:

  • Implemented replay logic for streaming responses in test mode: reads the recorded stream file, replays each chunk to the client with the original timing, and restores the Transfer-Encoding: chunked header for correct HTTP parsing. (pkg/agent/proxy/integrations/http/decode.go, pkg/agent/proxy/integrations/http/decode.goR161-L161)

  • Ensured that streamed responses are not redundantly parsed or read into memory, and that the StreamRef is propagated through the proxy pipeline. (pkg/agent/proxy/integrations/http/http.go, pkg/agent/proxy/integrations/http/http.goL149-R150)

General Improvements and Maintenance:

  • Improved error handling and logging for outgoing message recording, especially for network-closed errors. (pkg/agent/proxy/proxy.go, pkg/agent/proxy/proxy.goR826-R833)
  • Updated matcher logic to suppress success messages when failure logs are not emitted, supporting custom logging for streaming comparisons. (pkg/matcher/http/match.go, pkg/matcher/http/match.goL444-R446)
  • Minor fix to flag validation logic for disable-mapping in the CLI. (cli/provider/cmd.go, cli/provider/cmd.goR1058-R1066)

These changes provide robust support for capturing and replaying streaming HTTP responses, making the proxy more capable of handling modern web APIs that use streaming protocols.

Links & References

Closes: #[issue number that will be closed through this PR]

  • NA (if very small change like typo, linting, etc.)

🔗 Related PRs

  • NA

🐞 Related Issues

  • NA

📄 Related Documents

  • NA

What type of PR is this? (check all applicable)

  • 📦 Chore
  • 🍕 Feature
  • 🐞 Bug Fix
  • 📝 Documentation Update
  • 🎨 Style
  • 🧑‍💻 Code Refactor
  • 🔥 Performance Improvements
  • ✅ Test
  • 🔁 CI
  • ⏩ Revert

Added e2e test pipeline?

  • 👍 yes
  • 🙅 no, because they aren't needed
  • 🙋 no, because I need help

Added comments for hard-to-understand areas?

  • 👍 yes
  • 🙅 no, because the code is self-explanatory

Added to documentation?

  • 📜 README.md
  • 📓 Wiki
  • 🙅 no documentation needed

Are there any sample code or steps to test the changes?

  • 👍 yes, mentioned below
  • 🙅 no, because it is not needed

Self Review done?

  • ✅ yes
  • ❌ no, because I need help

Any relevant screenshots, recordings or logs?

  • NA

🧠 Semantics for PR Title & Branch Name

Please ensure your PR title and branch name follow the Keploy semantics:

📌 PR Semantics Guide
📌 Branch Semantics Guide

Examples:

  • PR Title: fix: patch MongoDB document update bug
  • Branch Name: feat/#1-login-flow (You may skip mentioning the issue number in the branch name if the change is small and the PR description clearly explains it.)

Additional checklist:

officialasishkumar and others added 30 commits February 20, 2026 01:51
…noise configuration handling.

Signed-off-by: Asish Kumar <[email protected]>
…update replay service logic.

Signed-off-by: Asish Kumar <[email protected]>
…ing the mock window and refine outgoing message recording logs.

Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
- Added a new function to merge consecutive SSE data fields, allowing for proper handling of multiline data.
- Updated the `compareSSEFields` function to utilize the new merging logic.
- Introduced tests to validate the normalization of multiline SSE data in both parsing and serialization contexts.

Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Co-authored-by: kiloconnect[bot] <240665456+kiloconnect[bot]@users.noreply.github.com>
Signed-off-by: Asish Kumar <[email protected]>
Co-authored-by: kiloconnect[bot] <240665456+kiloconnect[bot]@users.noreply.github.com>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
officialasishkumar and others added 25 commits February 26, 2026 10:28
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: Asish Kumar <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
- Introduced new types for HTTP streaming modes and configurations for better clarity and type safety.
- Created a new function `SimulateHTTPStreaming` to handle streaming HTTP requests separately from standard HTTP requests.
- Updated `SimulateHTTP` to utilize a shared request preparation function, reducing code duplication.
- Enhanced streaming response comparison functions to return detailed mismatch information, improving error reporting.
- Updated unit tests to use the new streaming simulation function and validate streaming responses more effectively.
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
Signed-off-by: gouravkrosx <[email protected]>
# Conflicts:
#	pkg/service/replay/replay.go
#	pkg/service/replay/utils_test.go
# Conflicts:
#	pkg/service/replay/replay.go
Copilot AI review requested due to automatic review settings March 23, 2026 05:20
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-class support for HTTP streaming responses (SSE/NDJSON/etc.) across Keploy’s record → mock storage → replay pipeline, including new structured stream representations and replay-time stream comparison.

Changes:

  • Record: detect streaming/chunked responses and offload streamed chunks to streams/*.ndjson, storing a StreamRef in the mock.
  • Replay: rehydrate and replay streamed responses by reading the recorded stream file and emitting chunks with recorded timing; add streaming comparators for test-time validation.
  • Housekeeping: noise config deep-copy fixes, CLI flag validation tweak, and ownership restoration helpers for sudo-based runs.

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
utils/permissions_windows.go Adds Windows no-op for ownership restoration helper.
utils/permissions_unix.go Adds RestoreFileOwnership helper for sudo scenarios.
pkg/util.go Introduces streaming simulation + stream comparators and related helpers.
pkg/util_test.go Adds unit tests for streaming simulation/comparison helpers.
pkg/service/replay/utils.go Avoids mutating global noise config; adds streaming mock window helper and mapping upsert helper.
pkg/service/replay/utils_test.go Adds tests for noise cloning/merge behavior and stream mock window.
pkg/service/replay/replay.go Defers streaming tests to a sequential phase; wires streaming comparison into replay reporting.
pkg/service/replay/hooks.go Routes HTTP streaming testcases through SimulateHTTPStreaming; adds noise map cloning helper.
pkg/service/record/record.go Passes test-set config path through to agent options (for stream file resolution).
pkg/models/instrument.go Extends OutgoingOptions with ConfigPath for resolving relative stream/assets.
pkg/models/http_stream_body.go Adds structured stream chunk model and YAML marshal/unmarshal support for streaming bodies.
pkg/models/http_stream_body_test.go Tests YAML marshal/unmarshal behavior for structured streaming bodies.
pkg/models/http.go Adds StreamRef and internal StreamBody field to HTTP response model.
pkg/matcher/http/match.go Suppresses success log when emitFailureLogs is disabled (streaming caller handles logging).
pkg/agent/proxy/proxy.go Tweaks outgoing record error logging for network-closed cases; adds debug success log.
pkg/agent/proxy/integrations/http/http.go Propagates StreamRef through HTTP mock parsing; avoids reading body when stream is offloaded.
pkg/agent/proxy/integrations/http/encode.go Threads stream capture state through chunk handling and final mock creation.
pkg/agent/proxy/integrations/http/decode.go Implements streaming mock replay by reading StreamRef ndjson and writing chunks to client.
pkg/agent/proxy/integrations/http/chunk.go Adds stream file offload in chunked response handler and writes per-chunk ndjson records.
pkg/agent/proxy/integrations/http/chunk_test.go Updates chunked response tests for new signature/streamRef plumbing.
cli/provider/cmd.go Adjusts disable-mapping flag handling to avoid unintended overrides.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +338 to +339
if err := os.MkdirAll(streamsDir, 0777); err != nil {
utils.LogError(h.Logger, err, "failed to create streams directory")
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating the streams directory with mode 0777 is overly permissive (world-writable) and can be a security issue on multi-user systems. Consider using a tighter default (e.g., 0755) and relying on ownership restoration/umask as needed.

Suggested change
if err := os.MkdirAll(streamsDir, 0777); err != nil {
utils.LogError(h.Logger, err, "failed to create streams directory")
if err := os.MkdirAll(streamsDir, 0755); err != nil {
utils.LogError(h.Logger, err, "failed to create streams directory; verify permissions for configPath and parent directories")

Copilot uses AI. Check for mistakes.
Comment on lines +356 to +359
// RestoreFileOwnership changes ownership of a file or directory to the original user
// when running under sudo. This prevents the agent process (which runs as root via sudo)
// from creating files that the CLI process (running as the normal user) cannot access.
// Returns nil if not running under sudo or if ownership was successfully restored.
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment for RestoreFileOwnership says "Returns nil" but the function doesn’t return anything. Please update the comment to match the signature/behavior (e.g., describe that it’s best-effort and logs debug on failures).

Suggested change
// RestoreFileOwnership changes ownership of a file or directory to the original user
// when running under sudo. This prevents the agent process (which runs as root via sudo)
// from creating files that the CLI process (running as the normal user) cannot access.
// Returns nil if not running under sudo or if ownership was successfully restored.
// RestoreFileOwnership changes ownership of files or directories to the original user
// when running under sudo. This prevents the agent process (which runs as root via sudo)
// from creating files that the CLI process (running as the normal user) cannot access.
// This is a best-effort helper: it does nothing when not running under sudo and logs debug
// messages if ownership restoration fails for any path.

Copilot uses AI. Check for mistakes.
Comment on lines +1655 to +1657
if !streamMatched {
r.logger.Error("streaming response mismatch detected", zap.String("testcase", tc.Name))
hadStreamingMismatch = true
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new error log doesn’t provide an actionable next step for the user (e.g., how to inspect the mismatch details or re-record the testcase). Please include a brief next action (like enabling debug logs / re-recording / inspecting streams) so failures are easier to remediate.

Copilot generated this review using guidance from organization custom instructions.
aType := detectScalarType(strings.TrimSpace(a.value))
if eType != aType {
return false, "field-type mismatch"
}
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compareSSEFields does not validate equality of non-"data" field values (e.g., id/event/retry). It only checks that the scalar types match, so "id:1" and "id:2" would compare equal. This undermines deterministic streaming comparisons; it should compare values (optionally with noise rules) rather than only types.

Suggested change
}
}
// For non-"data" fields, after confirming scalar type equality,
// ensure the actual values also match to maintain deterministic comparisons.
if strings.TrimSpace(e.value) != strings.TrimSpace(a.value) {
return false, "field-value mismatch"
}

Copilot uses AI. Check for mistakes.
Comment on lines +1356 to +1357
if len(eVal) != len(aVal) {
return false, "data-size mismatch"
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SSE "data" fields that are not JSON, the comparator only checks scalar type and string length (data-type mismatch / data-size mismatch) but not the actual value. This can allow mismatched payloads of the same length to pass. Consider comparing the actual normalized value (or a hash) rather than only size/type.

Suggested change
if len(eVal) != len(aVal) {
return false, "data-size mismatch"
if eVal != aVal {
return false, "data-value mismatch"

Copilot uses AI. Check for mistakes.
Comment on lines +214 to +215
utils.LogError(h.Logger, err, "failed to decode stream chunk")
break
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In streaming replay, decoder.Decode errors (other than EOF) are logged but then the loop just breaks without reporting the error back via errCh. This can yield partial/corrupt responses while the caller still treats the mock as successful. Consider sending the error on errCh (and returning) so the test run can fail fast and surface the root cause.

Suggested change
utils.LogError(h.Logger, err, "failed to decode stream chunk")
break
utils.LogError(h.Logger, err, "failed to decode stream chunk; verify the recorded stream file is not truncated or corrupted and matches the expected format", zap.String("path", filePath))
errCh <- err
return

Copilot uses AI. Check for mistakes.
Comment on lines +225 to +232
time.Sleep(diff)
}
prevTS = chunk.TS

for _, field := range chunk.Data {
if field.Key == "raw" {
if _, err := clientConn.Write([]byte(field.Value)); err != nil {
utils.LogError(h.Logger, err, "failed to write chunk to client")
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming replay uses time.Sleep(diff) to preserve inter-chunk timing, but it doesn’t respect ctx cancellation/timeouts. If the recorded diff is large (or the test is canceled), this goroutine will keep sleeping and can delay shutdown. Consider replacing sleep with a timer + select on ctx.Done().

Suggested change
time.Sleep(diff)
}
prevTS = chunk.TS
for _, field := range chunk.Data {
if field.Key == "raw" {
if _, err := clientConn.Write([]byte(field.Value)); err != nil {
utils.LogError(h.Logger, err, "failed to write chunk to client")
timer := time.NewTimer(diff)
select {
case <-ctx.Done():
// Ensure the timer is cleaned up to avoid leaks.
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
// Log with a clear next step for the user.
utils.LogError(h.Logger, ctx.Err(), "streaming replay canceled while waiting between chunks; verify test timeouts or context cancellation configuration")
errCh <- ctx.Err()
return
case <-timer.C:
// proceed with sending the next chunk
}
}
prevTS = chunk.TS
for _, field := range chunk.Data {
if field.Key == "raw" {
if _, err := clientConn.Write([]byte(field.Value)); err != nil {
utils.LogError(h.Logger, err, "failed to write chunk to client; ensure the client is still connected and retry the test if needed")

Copilot uses AI. Check for mistakes.

if len(expected.body) != len(actual.body) {
return false, "body-size mismatch"
}
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compareMultipartPart falls back to comparing only body size for non-JSON parts. That means different text parts with the same length will be considered equal, which can hide real mismatches in multipart streams. Consider comparing the normalized body bytes/strings for text/* (and possibly XML) parts, not just length.

Suggested change
}
}
if !bytes.Equal(expected.body, actual.body) {
return false, "body mismatch"
}

Copilot uses AI. Check for mistakes.
Comment on lines +331 to +359
if isStream {
configPath := opts.ConfigPath
if configPath == "" {
configPath = "."
}

streamsDir := filepath.Join(configPath, "streams")
if err := os.MkdirAll(streamsDir, 0777); err != nil {
utils.LogError(h.Logger, err, "failed to create streams directory")
return err
}
// Restore ownership of dirs created by the agent (root) so the CLI process
// (running as the normal user) can write mocks/tests into the same test-set dir.
utils.RestoreFileOwnership(h.Logger, configPath, streamsDir)

streamPath = filepath.Join("streams", fmt.Sprintf("stream_%d_%d.ndjson", time.Now().UnixNano(), pUtil.GetNextID()))
fullPath := filepath.Join(configPath, streamPath)

f, err := os.Create(fullPath)
if err != nil {
utils.LogError(h.Logger, err, "failed to create stream file")
return err
}
defer f.Close()
file = f
encoder = json.NewEncoder(file)
utils.RestoreFileOwnership(h.Logger, fullPath)
h.Logger.Debug("Created stream file", zap.String("path", fullPath))
}
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When isStream=true, chunkedResponse writes only the subsequent reads from destConn into the stream file. Any chunk/body bytes that were already read into resp/finalResp before calling handleChunkedResponses (pUtil.ReadBytes can include body data) are never persisted, so replay can miss the beginning of the stream. Consider extracting any bytes after the header terminator from finalResp and writing them to the stream file before entering the read loop (or passing the initial body bytes explicitly).

Copilot uses AI. Check for mistakes.
Comment on lines +1058 to +1068
if len(line) != len(expected) {
logger.Debug("plain-text stream mismatch",
zap.Int("frame_index", nextExpected),
zap.Int("expected_size", len(expected)),
zap.Int("actual_size", len(line)))
mismatchInfo := &StreamMismatchInfo{
FrameIndex: nextExpected,
ExpectedFrame: expected,
ActualFrame: line,
Reason: fmt.Sprintf("size mismatch: expected %d bytes, got %d bytes", len(expected), len(line)),
}
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comparePlainTextStream currently treats frames as matching as long as their byte lengths are equal; it never compares the actual content. This can produce false positives (different text with same length will pass). Consider comparing normalized line contents (or JSON-compare where applicable) and only using size checks as an optimization/fallback.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants