diff options
author | Chris Dickinson <christopher.s.dickinson@gmail.com> | 2014-12-04 12:00:23 -0800 |
---|---|---|
committer | Chris Dickinson <christopher.s.dickinson@gmail.com> | 2014-12-18 09:39:05 -0800 |
commit | 91586661c983f45d650644451df73c8649a8d459 (patch) | |
tree | 554f03e7631b68be46a22e7543eedeb6e61aa756 /lib | |
parent | 93533e98f76c2c9e577af3352b4eb709791f4d1e (diff) | |
download | node-91586661c983f45d650644451df73c8649a8d459.tar.gz |
stream: switch _writableState.buffer to queue
In cases where many small writes are made to a stream
lacking _writev, the array data structure backing the
WriteReq buffer would greatly increase GC pressure.
Specifically, in the fs.WriteStream case, the
clearBuffer routine would only clear a single WriteReq
from the buffer before exiting, but would cause the
entire backing array to be GC'd. Switching to [].shift
lessened pressure, but still the bulk of the time was
spent in memcpy.
This replaces that structure with a linked list-backed
queue so that adding and removing from the queue is O(1).
In the _writev case, collecting the buffer requires an
O(N) loop over the buffer, but that was already being
performed to collect callbacks, so slowdown should be
neglible.
PR-URL: https://github.com/joyent/node/pull/8826
Reviewed-by: Timothy J Fontaine <tjfontaine@gmail.com>
Reviewed-by: Trevor Norris <trev.norris@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/_stream_writable.js | 68 | ||||
-rw-r--r-- | lib/net.js | 2 |
2 files changed, 49 insertions, 21 deletions
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 92984eb08..39eee6146 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -28,6 +28,7 @@ Writable.WritableState = WritableState; var util = require('util'); var Stream = require('stream'); +var debug = util.debuglog('stream'); util.inherits(Writable, Stream); @@ -35,6 +36,7 @@ function WriteReq(chunk, encoding, cb) { this.chunk = chunk; this.encoding = encoding; this.callback = cb; + this.next = null; } function WritableState(options, stream) { @@ -109,7 +111,8 @@ function WritableState(options, stream) { // the amount that is being written when _write is called. this.writelen = 0; - this.buffer = []; + this.bufferedRequest = null; + this.lastBufferedRequest = null; // number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted @@ -123,6 +126,23 @@ function WritableState(options, stream) { this.errorEmitted = false; } +WritableState.prototype.getBuffer = function writableStateGetBuffer() { + var current = this.bufferedRequest; + var out = []; + while (current) { + out.push(current); + current = current.next; + } + return out; +}; + +Object.defineProperty(WritableState.prototype, 'buffer', { + get: util.deprecate(function() { + return this.getBuffer(); + }, '_writableState.buffer is deprecated. Use ' + + '_writableState.getBuffer() instead.') +}); + function Writable(options) { // Writable ctor is applied to Duplexes, though they're not // instanceof Writable, they're instanceof Readable. @@ -216,7 +236,7 @@ Writable.prototype.uncork = function() { !state.corked && !state.finished && !state.bufferProcessing && - state.buffer.length) + state.bufferedRequest) clearBuffer(this, state); } }; @@ -255,8 +275,15 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { if (!ret) state.needDrain = true; - if (state.writing || state.corked) - state.buffer.push(new WriteReq(chunk, encoding, cb)); + if (state.writing || state.corked) { + var last = state.lastBufferedRequest; + state.lastBufferedRequest = new WriteReq(chunk, encoding, cb); + if (last) { + last.next = state.lastBufferedRequest; + } else { + state.bufferedRequest = state.lastBufferedRequest; + } + } else doWrite(stream, state, false, len, chunk, encoding, cb); @@ -313,7 +340,7 @@ function onwrite(stream, er) { if (!finished && !state.corked && !state.bufferProcessing && - state.buffer.length) { + state.bufferedRequest) { clearBuffer(stream, state); } @@ -349,17 +376,23 @@ function onwriteDrain(stream, state) { // if there's something in the buffer waiting, then process it function clearBuffer(stream, state) { state.bufferProcessing = true; + var entry = state.bufferedRequest; - if (stream._writev && state.buffer.length > 1) { + if (stream._writev && entry && entry.next) { // Fast case, write everything using _writev() + var buffer = []; var cbs = []; - for (var c = 0; c < state.buffer.length; c++) - cbs.push(state.buffer[c].callback); + while (entry) { + cbs.push(entry.callback); + buffer.push(entry); + entry = entry.next; + } // count the one we are adding, as well. // TODO(isaacs) clean this up state.pendingcb++; - doWrite(stream, state, true, state.length, state.buffer, '', function(err) { + state.lastBufferedRequest = null; + doWrite(stream, state, true, state.length, buffer, '', function(err) { for (var i = 0; i < cbs.length; i++) { state.pendingcb--; cbs[i](err); @@ -367,34 +400,29 @@ function clearBuffer(stream, state) { }); // Clear buffer - state.buffer = []; } else { // Slow case, write chunks one-by-one - for (var c = 0; c < state.buffer.length; c++) { - var entry = state.buffer[c]; + while (entry) { var chunk = entry.chunk; var encoding = entry.encoding; var cb = entry.callback; var len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, cb); - + entry = entry.next; // if we didn't call the onwrite immediately, then // it means that we need to wait until it does. // also, that means that the chunk and cb are currently // being processed, so move the buffer counter past them. if (state.writing) { - c++; break; } } - if (c < state.buffer.length) - state.buffer = state.buffer.slice(c); - else - state.buffer.length = 0; + if (entry === null) + state.lastBufferedRequest = null; } - + state.bufferedRequest = entry; state.bufferProcessing = false; } @@ -435,7 +463,7 @@ Writable.prototype.end = function(chunk, encoding, cb) { function needFinish(stream, state) { return (state.ending && state.length === 0 && - state.buffer.length === 0 && + state.bufferedRequest === null && !state.finished && !state.writing); } diff --git a/lib/net.js b/lib/net.js index fac78f8c0..f0075b5a3 100644 --- a/lib/net.js +++ b/lib/net.js @@ -732,7 +732,7 @@ Socket.prototype.__defineGetter__('bytesWritten', function() { data = this._pendingData, encoding = this._pendingEncoding; - state.buffer.forEach(function(el) { + state.getBuffer().forEach(function(el) { if (util.isBuffer(el.chunk)) bytes += el.chunk.length; else |