summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2011-06-17 17:10:12 +0200
committerRyan Dahl <ry@tinyclouds.org>2011-06-17 17:10:12 +0200
commitdc0556c8cde5a3b81a1f7a85e29171e252dcc85f (patch)
tree5dc98e0878f6203034e147beef8fd4167768cd90 /lib
parente697cfb6fc020c08afa2c794ee5802e5a4d2b97b (diff)
downloadnode-dc0556c8cde5a3b81a1f7a85e29171e252dcc85f.tar.gz
net_uv: Implement end(), destroySoon()
Diffstat (limited to 'lib')
-rw-r--r--lib/net_uv.js85
1 files changed, 71 insertions, 14 deletions
diff --git a/lib/net_uv.js b/lib/net_uv.js
index d637f0c6e..9f9441ad8 100644
--- a/lib/net_uv.js
+++ b/lib/net_uv.js
@@ -5,6 +5,11 @@ var util = require('util');
var assert = require('assert');
var TCP = process.binding('tcp_wrap').TCP;
+/* Bit flags for socket._flags */
+var FLAG_GOT_EOF = 1 << 0;
+var FLAG_SHUTDOWN = 1 << 1;
+var FLAG_DESTROY_SOON = 1 << 2;
+
var debug;
if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) {
@@ -40,7 +45,11 @@ function Socket(options) {
this._handle.socket = this;
this._handle.onread = onread;
+ this.allowHalfOpen = options ? (options.allowHalfOpen || false) : false;
+
this._writeRequests = [];
+
+ this._flags = 0;
}
util.inherits(Socket, stream.Stream);
@@ -68,16 +77,12 @@ Object.defineProperty(Socket.prototype, 'readyState', {
if (this._connecting) {
return 'opening';
} else if (this.readable && this.writable) {
- assert(typeof this.fd === 'number');
return 'open';
} else if (this.readable && !this.writable) {
- assert(typeof this.fd === 'number');
return 'readOnly';
} else if (!this.readable && this.writable) {
- assert(typeof this.fd === 'number');
return 'writeOnly';
} else {
- assert(typeof this.fd !== 'number');
return 'closed';
}
}
@@ -101,13 +106,42 @@ Socket.prototype.resume = function() {
};
-Socket.prototype.end = function() {
- throw new Error("implement me");
+Socket.prototype.end = function(data, encoding) {
+ if (!this.writable) return;
+ this.writable = false;
+
+ if (data) this.write(data, encoding);
+ DTRACE_NET_STREAM_END(this);
+
+ if (this._flags & FLAG_GOT_EOF) {
+ this.destroySoon();
+ } else {
+ this._flags |= FLAG_SHUTDOWN;
+ var shutdownReq = this._handle.shutdown();
+ shutdownReq.oncomplete = afterShutdown;
+ }
};
+function afterShutdown(status, handle, req) {
+ var self = handle.socket;
+
+ assert.ok(self._flags & FLAG_SHUTDOWN);
+
+ if (self._flags & FLAG_GOT_EOF) {
+ self.destroy();
+ } else {
+ }
+}
+
+
Socket.prototype.destroySoon = function() {
- throw new Error("implement me");
+ this.writable = false;
+ this._flags |= FLAG_DESTROY_SOON;
+
+ if (this._writeRequests.length == 0) {
+ this.destroy();
+ }
};
@@ -167,8 +201,12 @@ function onread(buffer, offset, length) {
// EOF
self.readable = false;
+ assert.ok(!(self._flags & FLAG_GOT_EOF));
+ self._flags |= FLAG_GOT_EOF;
+
+ // We call destroy() before end(). 'close' not emitted until nextTick so
+ // the 'end' event will come first as required.
if (!self.writable) self.destroy();
- // Note: 'close' not emitted until nextTick.
if (!self.allowHalfOpen) self.end();
if (self._events && self._events['end']) self.emit('end');
@@ -230,7 +268,15 @@ function afterWrite(status, handle, req, buffer) {
var req_ = self._writeRequests.shift();
assert.equal(req, req_);
+ if (self._writeRequests.length == 0) {
+ self.emit('drain');
+ }
+
if (req.cb) req.cb();
+
+ if (self._writeRequests.length == 0 && self._flags & FLAG_DESTROY_SOON) {
+ self.destroy();
+ }
}
@@ -257,12 +303,13 @@ Socket.prototype.connect = function(port, host) {
// TODO retrun promise from Socket.prototype.connect which
// wraps _connectReq.
- assert.ok(!self._connectReq);
+ assert.ok(!self._connecting);
- self._connectReq = self._handle.connect(ip, port);
+ var connectReq = self._handle.connect(ip, port);
- if (self._connectReq) {
- self._connectReq.oncomplete = afterConnect;
+ if (connectReq) {
+ self._connecting = true;
+ connectReq.oncomplete = afterConnect;
} else {
self.destroy(errnoException(errno, 'connect'));
}
@@ -275,6 +322,9 @@ function afterConnect(status, handle, req) {
var self = handle.socket;
assert.equal(handle, self._handle);
+ assert.ok(self._connecting);
+ self._connecting = false;
+
if (status == 0) {
self.readable = self.writable = true;
timers.active(self);
@@ -320,7 +370,7 @@ function Server(/* [ options, ] listener */) {
this.on('connection', listenerCallback);
this.connections = 0;
- self.allowHalfOpen = options.allowHalfOpen || false;
+ this.allowHalfOpen = options.allowHalfOpen || false;
this._handle = new TCP();
@@ -376,7 +426,10 @@ function onconnection(clientHandle) {
var handle = this;
var self = handle.socket;
- var socket = new Socket({ handle: clientHandle });
+ var socket = new Socket({
+ handle: clientHandle,
+ allowHalfOpen: self.allowHalfOpen
+ });
socket.readable = socket.writable = true;
socket.resume();
@@ -387,3 +440,7 @@ function onconnection(clientHandle) {
self.emit('connection', socket);
}
+
+Server.prototype.close = function() {
+ this._handle.close();
+};