Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream: simplify .pipe() and .unpipe() in Readable
Now we are using `pipes` and `pipesCount` in Readable state and the
`pipes` value can be a stream or an array of streams. This change
reducing them into one `pipes` value, which is an array of streams.
It also adds a deprecation warning of `_readableState.pipesCount`.
  • Loading branch information
starkwang committed Jul 11, 2019
commit 241ffd3d490ffc4bb11a83311baa648c84efb761
14 changes: 14 additions & 0 deletions doc/api/deprecations.md
Original file line number Diff line number Diff line change
Expand Up @@ -2498,6 +2498,20 @@ Type: Runtime
Passing a callback to [`worker.terminate()`][] is deprecated. Use the returned
`Promise` instead, or a listener to the worker’s `'exit'` event.

<a id="DEP0133"></a>
### DEP0133: _readableState.pipesCount
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/28583
description: Documentation-only.
-->

Type: Documentation-only

`_readableState.pipesCount` is deprecated. Please use
`_readableState.pipes.length` instead.

[`--http-parser=legacy`]: cli.html#cli_http_parser_library
[`--pending-deprecation`]: cli.html#cli_pending_deprecation
[`Buffer.allocUnsafeSlow(size)`]: buffer.html#buffer_class_method_buffer_allocunsafeslow_size
Expand Down
63 changes: 17 additions & 46 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const EE = require('events');
const Stream = require('stream');
const { Buffer } = require('buffer');

const internalUtil = require('internal/util');
const debug = require('internal/util/debuglog').debuglog('stream');
const BufferList = require('internal/streams/buffer_list');
const destroyImpl = require('internal/streams/destroy');
Expand Down Expand Up @@ -97,8 +98,7 @@ function ReadableState(options, stream, isDuplex) {
// array.shift()
this.buffer = new BufferList();
this.length = 0;
this.pipes = null;
this.pipesCount = 0;
this.pipes = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
Expand Down Expand Up @@ -148,6 +148,13 @@ function ReadableState(options, stream, isDuplex) {
}
}

Object.defineProperty(ReadableState.prototype, 'pipesCount', {
get: internalUtil.deprecate(function() {
return this.pipes.length;
}, '_readableState.pipesCount is deprecated. ' +
'Use _readableState.pipes.length instead.', 'DEP0133'),
});

function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
Expand Down Expand Up @@ -635,19 +642,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;

switch (state.pipesCount) {
case 0:
state.pipes = dest;
break;
case 1:
state.pipes = [state.pipes, dest];
break;
default:
state.pipes.push(dest);
break;
}
state.pipesCount += 1;
debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);

const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
Expand Down Expand Up @@ -717,9 +713,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.includes(dest))) &&
!cleanedUp) {
if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
}
Expand Down Expand Up @@ -789,38 +783,16 @@ Readable.prototype.unpipe = function(dest) {
const unpipeInfo = { hasUnpiped: false };

// If we're not piping anywhere, then do nothing.
if (state.pipesCount === 0)
return this;

// Just one destination. most common case.
if (state.pipesCount === 1) {
// Passed in one, but it's not the right one.
if (dest && dest !== state.pipes)
return this;

if (!dest)
dest = state.pipes;

// got a match.
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
if (dest)
dest.emit('unpipe', this, unpipeInfo);
if (state.pipes.length === 0)
return this;
}

// Slow case with multiple pipe destinations.

if (!dest) {
// remove all.
var dests = state.pipes;
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
state.pipes = [];
state.flowing = false;

for (var i = 0; i < len; i++)
for (var i = 0; i < dests.length; i++)
dests[i].emit('unpipe', this, { hasUnpiped: false });
return this;
}
Expand All @@ -831,9 +803,8 @@ Readable.prototype.unpipe = function(dest) {
return this;

state.pipes.splice(index, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1)
state.pipes = state.pipes[0];
if (state.pipes.length === 0)
state.flowing = false;

dest.emit('unpipe', this, unpipeInfo);

Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-stream-pipe-same-destination-twice.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ const { PassThrough, Writable } = require('stream');
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes.length, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.unpipe(dest);

assert.strictEqual(passThrough._events.data.length, 1);
assert.strictEqual(passThrough._readableState.pipesCount, 1);
assert.strictEqual(passThrough._readableState.pipes, dest);
assert.strictEqual(passThrough._readableState.pipes.length, 1);
assert.deepStrictEqual(passThrough._readableState.pipes, [dest]);

passThrough.write('foobar');
passThrough.pipe(dest);
Expand All @@ -47,7 +47,7 @@ const { PassThrough, Writable } = require('stream');
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes.length, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

Expand All @@ -64,15 +64,15 @@ const { PassThrough, Writable } = require('stream');
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes.length, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.unpipe(dest);
passThrough.unpipe(dest);

assert.strictEqual(passThrough._events.data, undefined);
assert.strictEqual(passThrough._readableState.pipesCount, 0);
assert.strictEqual(passThrough._readableState.pipes.length, 0);

passThrough.write('foobar');
}
7 changes: 3 additions & 4 deletions test/parallel/test-stream-pipe-unpipe-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ assert.strictEqual(source._readableState.pipes.length, 2);

source.unpipe(dest2);

assert.strictEqual(source._readableState.pipes, dest1);
assert.deepStrictEqual(source._readableState.pipes, [dest1]);
assert.notStrictEqual(source._readableState.pipes, dest2);

dest2.on('unpipe', common.mustNotCall());
source.unpipe(dest2);

source.unpipe(dest1);

assert.strictEqual(source._readableState.pipes, null);
assert.strictEqual(source._readableState.pipes.length, 0);

{
// Test `cleanup()` if we unpipe all streams.
Expand All @@ -43,8 +43,7 @@ assert.strictEqual(source._readableState.pipes, null);
const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe'];

const checkSrcCleanup = common.mustCall(() => {
assert.strictEqual(source._readableState.pipes, null);
assert.strictEqual(source._readableState.pipesCount, 0);
assert.strictEqual(source._readableState.pipes.length, 0);
assert.strictEqual(source._readableState.flowing, false);

srcCheckEventNames.forEach((eventName) => {
Expand Down
12 changes: 12 additions & 0 deletions test/parallel/test-stream-pipescount-deprecation-warning.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';

const common = require('../common');
const { Readable } = require('stream');

const warning = '_readableState.pipesCount is deprecated. ' +
'Use _readableState.pipes.length instead.';

common.expectWarning('DeprecationWarning', warning, 'DEP0133');

const readable = new Readable();
readable._readableState.pipesCount;
12 changes: 6 additions & 6 deletions test/parallel/test-stream-unpipe-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class NeverEndReadable extends Readable {
dest.on('unpipe', common.mustCall());
src.pipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
assert.strictEqual(src._readableState.pipes.length, 0);
});
}

Expand All @@ -34,7 +34,7 @@ class NeverEndReadable extends Readable {
dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
src.pipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 1);
assert.strictEqual(src._readableState.pipes.length, 1);
});
}

Expand All @@ -46,7 +46,7 @@ class NeverEndReadable extends Readable {
src.pipe(dest);
src.unpipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
assert.strictEqual(src._readableState.pipes.length, 0);
});
}

Expand All @@ -57,7 +57,7 @@ class NeverEndReadable extends Readable {
dest.on('unpipe', common.mustCall());
src.pipe(dest, { end: false });
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
assert.strictEqual(src._readableState.pipes.length, 0);
});
}

Expand All @@ -68,7 +68,7 @@ class NeverEndReadable extends Readable {
dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
src.pipe(dest, { end: false });
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 1);
assert.strictEqual(src._readableState.pipes.length, 1);
});
}

Expand All @@ -80,6 +80,6 @@ class NeverEndReadable extends Readable {
src.pipe(dest, { end: false });
src.unpipe(dest);
setImmediate(() => {
assert.strictEqual(src._readableState.pipesCount, 0);
assert.strictEqual(src._readableState.pipes.length, 0);
});
}
4 changes: 2 additions & 2 deletions test/parallel/test-stream2-basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ class TestWriter extends EE {
w[0].on('write', function() {
if (--writes === 0) {
r.unpipe();
assert.strictEqual(r._readableState.pipes, null);
assert.deepStrictEqual(r._readableState.pipes, []);
w[0].end();
r.pipe(w[1]);
assert.strictEqual(r._readableState.pipes, w[1]);
assert.deepStrictEqual(r._readableState.pipes, [w[1]]);
}
});

Expand Down