Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup
  • Loading branch information
ronag committed Oct 10, 2020
commit d9d8e8684779eec45d7a84cfebca2176e0286d74
13 changes: 6 additions & 7 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -753,12 +753,6 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
}

src.on('data', ondata);

if (dest.writableNeedDrain === true) {
pause();
}

function ondata(chunk) {
debug('ondata');
const ret = dest.write(chunk);
Expand Down Expand Up @@ -810,7 +804,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.emit('pipe', src);

// Start the flow if it hasn't been started already.
if (!state.flowing) {

if (dest.writableNeedDrain === true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: why not:

Suggested change
if (dest.writableNeedDrain === true) {
if (dest.writableNeedDrain) {

if (state.flowing) {
src.pause();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little unsure how this affects the case where src is already piped to other destinations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 what about adding the stream to the state.awaitDrainWriters (refs pipeOnDrain + ondata(chunk)). Though that may require copying https://github.com/nodejs/node/pull/35348/files#diff-0117344ddd481d021ad96b9c8eea78a5R741-R747 from ondata...

Copy link
Member Author

@ronag ronag Oct 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea how that works or what it is intended for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, same 😄, based on the initial look it is a list of writables to wait for a drain event so this case seems fitting.
Let's ping @addaleax @BridgeAR @mscdex as most "recent" collaborators according to git-blame.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this can be reverted or done some other way? It seems to cause issues with the source getting stuck in a paused state if you previously fed the output buffer with a lot of data, see eg, electron/asar#210 That module basically just appends a bunch of files to a single binary, but after a random amount of files it gets stuck due to the source being in a paused state.
This could, of course, be easily worked around by adding the writableNeedDrain loop as in the issue above, or adding a resume call to the stream, but I feel like that shouldn't really be necessary.

} else if (!state.flowing) {
debug('pipe resume');
src.resume();
}
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ ObjectDefineProperties(Writable.prototype, {
get() {
const wState = this._writableState;
if (!wState) return false;
return !this._writable.destroyed && !wState.ending && wState.needDrain;
return !wState.destroyed && !wState.ending && wState.needDrain;
}
},

Expand Down
29 changes: 29 additions & 0 deletions test/parallel/test-stream-pipe-needDrain.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');

// Pipe should not continue writing if writable needs drain.
{
const w = new Writable({
write(buf, encoding, callback) {

}
});

while (w.write('asd'));

assert.strictEqual(w.writableNeedDrain, true);

const r = new Readable({
read() {
this.push('asd');
}
});

w.write = common.mustNotCall();

r.pipe(w);
}