summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorChris Dickinson <christopher.s.dickinson@gmail.com>2014-12-04 12:00:23 -0800
committerChris Dickinson <christopher.s.dickinson@gmail.com>2014-12-18 09:39:05 -0800
commit91586661c983f45d650644451df73c8649a8d459 (patch)
tree554f03e7631b68be46a22e7543eedeb6e61aa756 /lib
parent93533e98f76c2c9e577af3352b4eb709791f4d1e (diff)
downloadnode-91586661c983f45d650644451df73c8649a8d459.tar.gz
stream: switch _writableState.buffer to queue
In cases where many small writes are made to a stream lacking _writev, the array data structure backing the WriteReq buffer would greatly increase GC pressure. Specifically, in the fs.WriteStream case, the clearBuffer routine would only clear a single WriteReq from the buffer before exiting, but would cause the entire backing array to be GC'd. Switching to [].shift lessened pressure, but still the bulk of the time was spent in memcpy. This replaces that structure with a linked list-backed queue so that adding and removing from the queue is O(1). In the _writev case, collecting the buffer requires an O(N) loop over the buffer, but that was already being performed to collect callbacks, so slowdown should be neglible. PR-URL: https://github.com/joyent/node/pull/8826 Reviewed-by: Timothy J Fontaine <tjfontaine@gmail.com> Reviewed-by: Trevor Norris <trev.norris@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_writable.js68
-rw-r--r--lib/net.js2
2 files changed, 49 insertions, 21 deletions
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 92984eb08..39eee6146 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -28,6 +28,7 @@ Writable.WritableState = WritableState;
var util = require('util');
var Stream = require('stream');
+var debug = util.debuglog('stream');
util.inherits(Writable, Stream);
@@ -35,6 +36,7 @@ function WriteReq(chunk, encoding, cb) {
this.chunk = chunk;
this.encoding = encoding;
this.callback = cb;
+ this.next = null;
}
function WritableState(options, stream) {
@@ -109,7 +111,8 @@ function WritableState(options, stream) {
// the amount that is being written when _write is called.
this.writelen = 0;
- this.buffer = [];
+ this.bufferedRequest = null;
+ this.lastBufferedRequest = null;
// number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
@@ -123,6 +126,23 @@ function WritableState(options, stream) {
this.errorEmitted = false;
}
+WritableState.prototype.getBuffer = function writableStateGetBuffer() {
+ var current = this.bufferedRequest;
+ var out = [];
+ while (current) {
+ out.push(current);
+ current = current.next;
+ }
+ return out;
+};
+
+Object.defineProperty(WritableState.prototype, 'buffer', {
+ get: util.deprecate(function() {
+ return this.getBuffer();
+ }, '_writableState.buffer is deprecated. Use ' +
+ '_writableState.getBuffer() instead.')
+});
+
function Writable(options) {
// Writable ctor is applied to Duplexes, though they're not
// instanceof Writable, they're instanceof Readable.
@@ -216,7 +236,7 @@ Writable.prototype.uncork = function() {
!state.corked &&
!state.finished &&
!state.bufferProcessing &&
- state.buffer.length)
+ state.bufferedRequest)
clearBuffer(this, state);
}
};
@@ -255,8 +275,15 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;
- if (state.writing || state.corked)
- state.buffer.push(new WriteReq(chunk, encoding, cb));
+ if (state.writing || state.corked) {
+ var last = state.lastBufferedRequest;
+ state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
+ if (last) {
+ last.next = state.lastBufferedRequest;
+ } else {
+ state.bufferedRequest = state.lastBufferedRequest;
+ }
+ }
else
doWrite(stream, state, false, len, chunk, encoding, cb);
@@ -313,7 +340,7 @@ function onwrite(stream, er) {
if (!finished &&
!state.corked &&
!state.bufferProcessing &&
- state.buffer.length) {
+ state.bufferedRequest) {
clearBuffer(stream, state);
}
@@ -349,17 +376,23 @@ function onwriteDrain(stream, state) {
// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
+ var entry = state.bufferedRequest;
- if (stream._writev && state.buffer.length > 1) {
+ if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
+ var buffer = [];
var cbs = [];
- for (var c = 0; c < state.buffer.length; c++)
- cbs.push(state.buffer[c].callback);
+ while (entry) {
+ cbs.push(entry.callback);
+ buffer.push(entry);
+ entry = entry.next;
+ }
// count the one we are adding, as well.
// TODO(isaacs) clean this up
state.pendingcb++;
- doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
+ state.lastBufferedRequest = null;
+ doWrite(stream, state, true, state.length, buffer, '', function(err) {
for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
@@ -367,34 +400,29 @@ function clearBuffer(stream, state) {
});
// Clear buffer
- state.buffer = [];
} else {
// Slow case, write chunks one-by-one
- for (var c = 0; c < state.buffer.length; c++) {
- var entry = state.buffer[c];
+ while (entry) {
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;
doWrite(stream, state, false, len, chunk, encoding, cb);
-
+ entry = entry.next;
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
- c++;
break;
}
}
- if (c < state.buffer.length)
- state.buffer = state.buffer.slice(c);
- else
- state.buffer.length = 0;
+ if (entry === null)
+ state.lastBufferedRequest = null;
}
-
+ state.bufferedRequest = entry;
state.bufferProcessing = false;
}
@@ -435,7 +463,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
function needFinish(stream, state) {
return (state.ending &&
state.length === 0 &&
- state.buffer.length === 0 &&
+ state.bufferedRequest === null &&
!state.finished &&
!state.writing);
}
diff --git a/lib/net.js b/lib/net.js
index fac78f8c0..f0075b5a3 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -732,7 +732,7 @@ Socket.prototype.__defineGetter__('bytesWritten', function() {
data = this._pendingData,
encoding = this._pendingEncoding;
- state.buffer.forEach(function(el) {
+ state.getBuffer().forEach(function(el) {
if (util.isBuffer(el.chunk))
bytes += el.chunk.length;
else