summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFedor Indutny <fedor.indutny@gmail.com>2013-06-11 12:55:49 +0200
committerFedor Indutny <fedor.indutny@gmail.com>2013-06-15 21:44:51 +0200
commit4c48a39c65c175c2f0b1ec5bf58456dd83e71d99 (patch)
tree40baa626c1ec10ba0c89216e71dcaf43f47880f2
parent6978e998ee7c08b3dd10e4cecd2a167696b74317 (diff)
downloadnode-4c48a39c65c175c2f0b1ec5bf58456dd83e71d99.tar.gz
stream_wrap: introduce StreamWrapCallbacks
StreamWrapCallbacks is a helper class for incepting into uv_stream_t* management process.
-rw-r--r--src/stream_wrap.cc262
-rw-r--r--src/stream_wrap.h79
2 files changed, 219 insertions, 122 deletions
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index 61527ed0a..51f57f1f2 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -56,23 +56,6 @@ using v8::String;
using v8::TryCatch;
using v8::Value;
-typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
-
-class WriteWrap: public ReqWrap<uv_write_t> {
- public:
- void* operator new(size_t size, char* storage) { return storage; }
-
- // This is just to keep the compiler happy. It should never be called, since
- // we don't use exceptions in node.
- void operator delete(void* ptr, char* storage) { assert(0); }
-
- protected:
- // People should not be using the non-placement new and delete operator on a
- // WriteWrap. Ensure this never happens.
- void* operator new (size_t size) { assert(0); };
- void operator delete(void* ptr) { assert(0); };
-};
-
static Persistent<String> buffer_sym;
static Persistent<String> bytes_sym;
@@ -110,8 +93,10 @@ void StreamWrap::Initialize(Handle<Object> target) {
StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream)
- : HandleWrap(object, reinterpret_cast<uv_handle_t*>(stream)) {
+ : HandleWrap(object, reinterpret_cast<uv_handle_t*>(stream)),
+ default_callbacks_(this) {
stream_ = stream;
+ callbacks_ = &default_callbacks_;
}
@@ -173,8 +158,8 @@ Handle<Value> StreamWrap::ReadStop(const Arguments& args) {
uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
assert(wrap->stream_ == reinterpret_cast<uv_stream_t*>(handle));
- char* buf = slab_allocator->Allocate(wrap->object_, suggested_size);
- return uv_buf_init(buf, suggested_size);
+
+ return wrap->callbacks_->DoAlloc(handle, suggested_size);
}
@@ -200,8 +185,10 @@ static Local<Object> AcceptHandle(uv_stream_t* pipe) {
}
-void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
- uv_buf_t buf, uv_handle_type pending) {
+void StreamWrap::OnReadCommon(uv_stream_t* handle,
+ ssize_t nread,
+ uv_buf_t buf,
+ uv_handle_type pending) {
HandleScope scope(node_isolate);
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
@@ -210,56 +197,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
// uv_close() on the handle.
assert(wrap->object_.IsEmpty() == false);
- if (nread < 0) {
- // If libuv reports an error or EOF it *may* give us a buffer back. In that
- // case, return the space to the slab.
- if (buf.base != NULL) {
- slab_allocator->Shrink(wrap->object_, buf.base, 0);
+ if (nread > 0) {
+ if (wrap->stream_->type == UV_TCP) {
+ NODE_COUNT_NET_BYTES_RECV(nread);
+ } else if (wrap->stream_->type == UV_NAMED_PIPE) {
+ NODE_COUNT_PIPE_BYTES_RECV(nread);
}
-
- SetErrno(uv_last_error(uv_default_loop()));
- MakeCallback(wrap->object_, onread_sym, 0, NULL);
- return;
}
assert(buf.base != NULL);
- Local<Object> slab = slab_allocator->Shrink(wrap->object_,
- buf.base,
- nread);
-
- if (nread == 0) return;
- assert(static_cast<size_t>(nread) <= buf.len);
-
- int argc = 3;
- Local<Value> argv[4] = {
- slab,
- Integer::NewFromUnsigned(buf.base - Buffer::Data(slab), node_isolate),
- Integer::NewFromUnsigned(nread, node_isolate)
- };
-
- Local<Object> pending_obj;
- if (pending == UV_TCP) {
- pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
- } else if (pending == UV_NAMED_PIPE) {
- pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
- } else if (pending == UV_UDP) {
- pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
- } else {
- assert(pending == UV_UNKNOWN_HANDLE);
- }
-
- if (!pending_obj.IsEmpty()) {
- argv[3] = pending_obj;
- argc++;
- }
-
- if (wrap->stream_->type == UV_TCP) {
- NODE_COUNT_NET_BYTES_RECV(nread);
- } else if (wrap->stream_->type == UV_NAMED_PIPE) {
- NODE_COUNT_PIPE_BYTES_RECV(nread);
- }
-
- MakeCallback(wrap->object_, onread_sym, argc, argv);
+ wrap->callbacks_->DoRead(handle, nread, buf, pending);
}
@@ -294,37 +241,29 @@ Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {
assert(args.Length() >= 1 && Buffer::HasInstance(args[0]));
size_t length = Buffer::Length(args[0]);
char* storage = new char[sizeof(WriteWrap)];
- WriteWrap* req_wrap = new (storage) WriteWrap();
+ WriteWrap* req_wrap = new (storage) WriteWrap(wrap);
req_wrap->object_->SetHiddenValue(buffer_sym, args[0]);
uv_buf_t buf;
WriteBuffer(args[0], &buf);
- int r = uv_write(&req_wrap->req_,
- wrap->stream_,
- &buf,
- 1,
- StreamWrap::AfterWrite);
+ int r = wrap->callbacks_->DoWrite(req_wrap,
+ &buf,
+ 1,
+ NULL,
+ StreamWrap::AfterWrite);
req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym,
Integer::NewFromUnsigned(length, node_isolate));
- wrap->UpdateWriteQueueSize();
-
if (r) {
SetErrno(uv_last_error(uv_default_loop()));
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null(node_isolate));
} else {
- if (wrap->stream_->type == UV_TCP) {
- NODE_COUNT_NET_BYTES_SENT(length);
- } else if (wrap->stream_->type == UV_NAMED_PIPE) {
- NODE_COUNT_PIPE_BYTES_SENT(length);
- }
-
return scope.Close(req_wrap->object_);
}
}
@@ -359,7 +298,7 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
}
char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
- WriteWrap* req_wrap = new (storage) WriteWrap();
+ WriteWrap* req_wrap = new (storage) WriteWrap(wrap);
char* data = reinterpret_cast<char*>(ROUND_UP(
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
@@ -378,12 +317,11 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
reinterpret_cast<uv_pipe_t*>(wrap->stream_)->ipc;
if (!ipc_pipe) {
- r = uv_write(&req_wrap->req_,
- wrap->stream_,
- &buf,
- 1,
- StreamWrap::AfterWrite);
-
+ r = wrap->callbacks_->DoWrite(req_wrap,
+ &buf,
+ 1,
+ NULL,
+ StreamWrap::AfterWrite);
} else {
uv_handle_t* send_handle = NULL;
@@ -403,31 +341,22 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
req_wrap->object_->Set(handle_sym, send_handle_obj);
}
- r = uv_write2(&req_wrap->req_,
- wrap->stream_,
- &buf,
- 1,
- reinterpret_cast<uv_stream_t*>(send_handle),
- StreamWrap::AfterWrite);
+ r = wrap->callbacks_->DoWrite(req_wrap,
+ &buf,
+ 1,
+ reinterpret_cast<uv_stream_t*>(send_handle),
+ StreamWrap::AfterWrite);
}
req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) data_size));
- wrap->UpdateWriteQueueSize();
-
if (r) {
SetErrno(uv_last_error(uv_default_loop()));
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null(node_isolate));
} else {
- if (wrap->stream_->type == UV_TCP) {
- NODE_COUNT_NET_BYTES_SENT(buf.len);
- } else if (wrap->stream_->type == UV_NAMED_PIPE) {
- NODE_COUNT_PIPE_BYTES_SENT(buf.len);
- }
-
return scope.Close(req_wrap->object_);
}
}
@@ -483,7 +412,7 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) {
storage_size += sizeof(WriteWrap);
char* storage = new char[storage_size];
- WriteWrap* req_wrap = new (storage) WriteWrap();
+ WriteWrap* req_wrap = new (storage) WriteWrap(wrap);
uint32_t bytes = 0;
size_t offset = sizeof(WriteWrap);
@@ -513,11 +442,11 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) {
bytes += str_size;
}
- int r = uv_write(&req_wrap->req_,
- wrap->stream_,
- bufs,
- count,
- StreamWrap::AfterWrite);
+ int r = wrap->callbacks_->DoWrite(req_wrap,
+ bufs,
+ count,
+ NULL,
+ StreamWrap::AfterWrite);
// Deallocate space
if (bufs != bufs_)
@@ -526,20 +455,12 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) {
req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym, Number::New(bytes));
- wrap->UpdateWriteQueueSize();
-
if (r) {
SetErrno(uv_last_error(uv_default_loop()));
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null(node_isolate));
} else {
- if (wrap->stream_->type == UV_TCP) {
- NODE_COUNT_NET_BYTES_SENT(bytes);
- } else if (wrap->stream_->type == UV_NAMED_PIPE) {
- NODE_COUNT_PIPE_BYTES_SENT(bytes);
- }
-
return scope.Close(req_wrap->object_);
}
}
@@ -561,8 +482,8 @@ Handle<Value> StreamWrap::WriteUcs2String(const Arguments& args) {
void StreamWrap::AfterWrite(uv_write_t* req, int status) {
- WriteWrap* req_wrap = (WriteWrap*) req->data;
- StreamWrap* wrap = (StreamWrap*) req->handle->data;
+ WriteWrap* req_wrap = container_of(req, WriteWrap, req_);
+ StreamWrap* wrap = req_wrap->wrap_;
HandleScope scope(node_isolate);
@@ -579,7 +500,7 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) {
SetErrno(uv_last_error(uv_default_loop()));
}
- wrap->UpdateWriteQueueSize();
+ wrap->callbacks_->AfterWrite(req_wrap);
Local<Value> argv[] = {
Integer::New(status, node_isolate),
@@ -601,7 +522,7 @@ Handle<Value> StreamWrap::Shutdown(const Arguments& args) {
ShutdownWrap* req_wrap = new ShutdownWrap();
- int r = uv_shutdown(&req_wrap->req_, wrap->stream_, AfterShutdown);
+ int r = wrap->callbacks_->DoShutdown(req_wrap, AfterShutdown);
req_wrap->Dispatched();
@@ -641,4 +562,103 @@ void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
}
+int StreamWrapCallbacks::DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle,
+ uv_write_cb cb) {
+ int r;
+ if (send_handle == NULL) {
+ r = uv_write(&w->req_, wrap_->stream_, bufs, count, cb);
+ } else {
+ r = uv_write2(&w->req_, wrap_->stream_, bufs, count, send_handle, cb);
+ }
+
+ if (!r) {
+ size_t bytes = 0;
+ for (int i = 0; i < count; i++)
+ bytes += bufs[i].len;
+ if (wrap_->stream_->type == UV_TCP) {
+ NODE_COUNT_NET_BYTES_SENT(bytes);
+ } else if (wrap_->stream_->type == UV_NAMED_PIPE) {
+ NODE_COUNT_PIPE_BYTES_SENT(bytes);
+ }
+ }
+
+ wrap_->UpdateWriteQueueSize();
+
+ return r;
+}
+
+
+void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
+ wrap_->UpdateWriteQueueSize();
+}
+
+
+uv_buf_t StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
+ size_t suggested_size) {
+ char* buf = slab_allocator->Allocate(wrap_->object_, suggested_size);
+ return uv_buf_init(buf, suggested_size);
+}
+
+
+void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
+ ssize_t nread,
+ uv_buf_t buf,
+ uv_handle_type pending) {
+ HandleScope scope(node_isolate);
+
+ if (nread < 0) {
+ // If libuv reports an error or EOF it *may* give us a buffer back. In that
+ // case, return the space to the slab.
+ if (buf.base != NULL)
+ slab_allocator->Shrink(Self(), buf.base, 0);
+
+ SetErrno(uv_last_error(uv_default_loop()));
+ MakeCallback(Self(), onread_sym, 0, NULL);
+ return;
+ }
+
+ Local<Object> slab = slab_allocator->Shrink(wrap_->object_, buf.base, nread);
+
+ if (nread == 0) return;
+ assert(static_cast<size_t>(nread) <= buf.len);
+
+ int argc = 3;
+ Local<Value> argv[4] = {
+ slab,
+ Integer::NewFromUnsigned(buf.base - Buffer::Data(slab), node_isolate),
+ Integer::NewFromUnsigned(nread, node_isolate)
+ };
+
+ Local<Object> pending_obj;
+ if (pending == UV_TCP) {
+ pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
+ } else if (pending == UV_NAMED_PIPE) {
+ pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
+ } else if (pending == UV_UDP) {
+ pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
+ } else {
+ assert(pending == UV_UNKNOWN_HANDLE);
+ }
+
+ if (!pending_obj.IsEmpty()) {
+ argv[3] = pending_obj;
+ argc++;
+ }
+
+ MakeCallback(wrap_->object_, onread_sym, argc, argv);
+}
+
+
+int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
+ return uv_shutdown(&req_wrap->req_, wrap_->stream_, cb);
+}
+
+
+Handle<Object> StreamWrapCallbacks::Self() {
+ return wrap_->object_;
+}
+
}
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index 8b58c8af6..6b73efd68 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -25,18 +25,83 @@
#include "v8.h"
#include "node.h"
#include "handle_wrap.h"
+#include "req_wrap.h"
#include "string_bytes.h"
namespace node {
// Forward declaration
-class WriteWrap;
+class StreamWrap;
+typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
+
+class WriteWrap: public ReqWrap<uv_write_t> {
+ public:
+ explicit WriteWrap(StreamWrap* wrap) {
+ wrap_ = wrap;
+ }
+
+ void* operator new(size_t size, char* storage) { return storage; }
+
+ // This is just to keep the compiler happy. It should never be called, since
+ // we don't use exceptions in node.
+ void operator delete(void* ptr, char* storage) { assert(0); }
+
+ StreamWrap* wrap_;
+
+ protected:
+ // People should not be using the non-placement new and delete operator on a
+ // WriteWrap. Ensure this never happens.
+ void* operator new(size_t size) { assert(0); };
+ void operator delete(void* ptr) { assert(0); };
+};
+
+// Overridable callbacks' types
+class StreamWrapCallbacks {
+ public:
+ explicit StreamWrapCallbacks(StreamWrap* wrap) : wrap_(wrap) {
+ }
+
+ explicit StreamWrapCallbacks(StreamWrapCallbacks* old) : wrap_(old->wrap_) {
+ }
+
+ virtual ~StreamWrapCallbacks() {
+ }
+
+ virtual int DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle,
+ uv_write_cb cb);
+ virtual void AfterWrite(WriteWrap* w);
+ virtual uv_buf_t DoAlloc(uv_handle_t* handle, size_t suggested_size);
+ virtual void DoRead(uv_stream_t* handle,
+ ssize_t nread,
+ uv_buf_t buf,
+ uv_handle_type pending);
+ virtual int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb);
+
+ v8::Handle<v8::Object> Self();
+
+ protected:
+ StreamWrap* wrap_;
+};
class StreamWrap : public HandleWrap {
public:
uv_stream_t* GetStream() { return stream_; }
+ void OverrideCallbacks(StreamWrapCallbacks* callbacks) {
+ StreamWrapCallbacks* old = callbacks_;
+ callbacks_ = callbacks;
+ if (old != &default_callbacks_)
+ delete old;
+ }
+
+ StreamWrapCallbacks* GetCallbacks() {
+ return callbacks_;
+ }
+
static void Initialize(v8::Handle<v8::Object> target);
static v8::Handle<v8::Value> GetFD(v8::Local<v8::String>,
@@ -53,10 +118,19 @@ class StreamWrap : public HandleWrap {
static v8::Handle<v8::Value> WriteUtf8String(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteUcs2String(const v8::Arguments& args);
+ // Overridable callbacks
+ StreamWrapCallbacks* callbacks_;
+
protected:
static size_t WriteBuffer(v8::Handle<v8::Value> val, uv_buf_t* buf);
StreamWrap(v8::Handle<v8::Object> object, uv_stream_t* stream);
+ ~StreamWrap() {
+ if (callbacks_ != &default_callbacks_) {
+ delete callbacks_;
+ callbacks_ = NULL;
+ }
+ }
void StateChange() { }
void UpdateWriteQueueSize();
@@ -79,6 +153,9 @@ class StreamWrap : public HandleWrap {
size_t slab_offset_;
uv_stream_t* stream_;
+
+ StreamWrapCallbacks default_callbacks_;
+ friend class StreamWrapCallbacks;
};