diff options
author | Fedor Indutny <fedor.indutny@gmail.com> | 2013-06-11 12:55:49 +0200 |
---|---|---|
committer | Fedor Indutny <fedor.indutny@gmail.com> | 2013-06-15 21:44:51 +0200 |
commit | 4c48a39c65c175c2f0b1ec5bf58456dd83e71d99 (patch) | |
tree | 40baa626c1ec10ba0c89216e71dcaf43f47880f2 | |
parent | 6978e998ee7c08b3dd10e4cecd2a167696b74317 (diff) | |
download | node-4c48a39c65c175c2f0b1ec5bf58456dd83e71d99.tar.gz |
stream_wrap: introduce StreamWrapCallbacks
StreamWrapCallbacks is a helper class for incepting into uv_stream_t*
management process.
-rw-r--r-- | src/stream_wrap.cc | 262 | ||||
-rw-r--r-- | src/stream_wrap.h | 79 |
2 files changed, 219 insertions, 122 deletions
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 61527ed0a..51f57f1f2 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -56,23 +56,6 @@ using v8::String; using v8::TryCatch; using v8::Value; -typedef class ReqWrap<uv_shutdown_t> ShutdownWrap; - -class WriteWrap: public ReqWrap<uv_write_t> { - public: - void* operator new(size_t size, char* storage) { return storage; } - - // This is just to keep the compiler happy. It should never be called, since - // we don't use exceptions in node. - void operator delete(void* ptr, char* storage) { assert(0); } - - protected: - // People should not be using the non-placement new and delete operator on a - // WriteWrap. Ensure this never happens. - void* operator new (size_t size) { assert(0); }; - void operator delete(void* ptr) { assert(0); }; -}; - static Persistent<String> buffer_sym; static Persistent<String> bytes_sym; @@ -110,8 +93,10 @@ void StreamWrap::Initialize(Handle<Object> target) { StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream) - : HandleWrap(object, reinterpret_cast<uv_handle_t*>(stream)) { + : HandleWrap(object, reinterpret_cast<uv_handle_t*>(stream)), + default_callbacks_(this) { stream_ = stream; + callbacks_ = &default_callbacks_; } @@ -173,8 +158,8 @@ Handle<Value> StreamWrap::ReadStop(const Arguments& args) { uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) { StreamWrap* wrap = static_cast<StreamWrap*>(handle->data); assert(wrap->stream_ == reinterpret_cast<uv_stream_t*>(handle)); - char* buf = slab_allocator->Allocate(wrap->object_, suggested_size); - return uv_buf_init(buf, suggested_size); + + return wrap->callbacks_->DoAlloc(handle, suggested_size); } @@ -200,8 +185,10 @@ static Local<Object> AcceptHandle(uv_stream_t* pipe) { } -void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, - uv_buf_t buf, uv_handle_type pending) { +void StreamWrap::OnReadCommon(uv_stream_t* handle, + ssize_t nread, + uv_buf_t buf, + uv_handle_type pending) { HandleScope scope(node_isolate); StreamWrap* wrap = static_cast<StreamWrap*>(handle->data); @@ -210,56 +197,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, // uv_close() on the handle. assert(wrap->object_.IsEmpty() == false); - if (nread < 0) { - // If libuv reports an error or EOF it *may* give us a buffer back. In that - // case, return the space to the slab. - if (buf.base != NULL) { - slab_allocator->Shrink(wrap->object_, buf.base, 0); + if (nread > 0) { + if (wrap->stream_->type == UV_TCP) { + NODE_COUNT_NET_BYTES_RECV(nread); + } else if (wrap->stream_->type == UV_NAMED_PIPE) { + NODE_COUNT_PIPE_BYTES_RECV(nread); } - - SetErrno(uv_last_error(uv_default_loop())); - MakeCallback(wrap->object_, onread_sym, 0, NULL); - return; } assert(buf.base != NULL); - Local<Object> slab = slab_allocator->Shrink(wrap->object_, - buf.base, - nread); - - if (nread == 0) return; - assert(static_cast<size_t>(nread) <= buf.len); - - int argc = 3; - Local<Value> argv[4] = { - slab, - Integer::NewFromUnsigned(buf.base - Buffer::Data(slab), node_isolate), - Integer::NewFromUnsigned(nread, node_isolate) - }; - - Local<Object> pending_obj; - if (pending == UV_TCP) { - pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle); - } else if (pending == UV_NAMED_PIPE) { - pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle); - } else if (pending == UV_UDP) { - pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle); - } else { - assert(pending == UV_UNKNOWN_HANDLE); - } - - if (!pending_obj.IsEmpty()) { - argv[3] = pending_obj; - argc++; - } - - if (wrap->stream_->type == UV_TCP) { - NODE_COUNT_NET_BYTES_RECV(nread); - } else if (wrap->stream_->type == UV_NAMED_PIPE) { - NODE_COUNT_PIPE_BYTES_RECV(nread); - } - - MakeCallback(wrap->object_, onread_sym, argc, argv); + wrap->callbacks_->DoRead(handle, nread, buf, pending); } @@ -294,37 +241,29 @@ Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) { assert(args.Length() >= 1 && Buffer::HasInstance(args[0])); size_t length = Buffer::Length(args[0]); char* storage = new char[sizeof(WriteWrap)]; - WriteWrap* req_wrap = new (storage) WriteWrap(); + WriteWrap* req_wrap = new (storage) WriteWrap(wrap); req_wrap->object_->SetHiddenValue(buffer_sym, args[0]); uv_buf_t buf; WriteBuffer(args[0], &buf); - int r = uv_write(&req_wrap->req_, - wrap->stream_, - &buf, - 1, - StreamWrap::AfterWrite); + int r = wrap->callbacks_->DoWrite(req_wrap, + &buf, + 1, + NULL, + StreamWrap::AfterWrite); req_wrap->Dispatched(); req_wrap->object_->Set(bytes_sym, Integer::NewFromUnsigned(length, node_isolate)); - wrap->UpdateWriteQueueSize(); - if (r) { SetErrno(uv_last_error(uv_default_loop())); req_wrap->~WriteWrap(); delete[] storage; return scope.Close(v8::Null(node_isolate)); } else { - if (wrap->stream_->type == UV_TCP) { - NODE_COUNT_NET_BYTES_SENT(length); - } else if (wrap->stream_->type == UV_NAMED_PIPE) { - NODE_COUNT_PIPE_BYTES_SENT(length); - } - return scope.Close(req_wrap->object_); } } @@ -359,7 +298,7 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) { } char* storage = new char[sizeof(WriteWrap) + storage_size + 15]; - WriteWrap* req_wrap = new (storage) WriteWrap(); + WriteWrap* req_wrap = new (storage) WriteWrap(wrap); char* data = reinterpret_cast<char*>(ROUND_UP( reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16)); @@ -378,12 +317,11 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) { reinterpret_cast<uv_pipe_t*>(wrap->stream_)->ipc; if (!ipc_pipe) { - r = uv_write(&req_wrap->req_, - wrap->stream_, - &buf, - 1, - StreamWrap::AfterWrite); - + r = wrap->callbacks_->DoWrite(req_wrap, + &buf, + 1, + NULL, + StreamWrap::AfterWrite); } else { uv_handle_t* send_handle = NULL; @@ -403,31 +341,22 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) { req_wrap->object_->Set(handle_sym, send_handle_obj); } - r = uv_write2(&req_wrap->req_, - wrap->stream_, - &buf, - 1, - reinterpret_cast<uv_stream_t*>(send_handle), - StreamWrap::AfterWrite); + r = wrap->callbacks_->DoWrite(req_wrap, + &buf, + 1, + reinterpret_cast<uv_stream_t*>(send_handle), + StreamWrap::AfterWrite); } req_wrap->Dispatched(); req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) data_size)); - wrap->UpdateWriteQueueSize(); - if (r) { SetErrno(uv_last_error(uv_default_loop())); req_wrap->~WriteWrap(); delete[] storage; return scope.Close(v8::Null(node_isolate)); } else { - if (wrap->stream_->type == UV_TCP) { - NODE_COUNT_NET_BYTES_SENT(buf.len); - } else if (wrap->stream_->type == UV_NAMED_PIPE) { - NODE_COUNT_PIPE_BYTES_SENT(buf.len); - } - return scope.Close(req_wrap->object_); } } @@ -483,7 +412,7 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) { storage_size += sizeof(WriteWrap); char* storage = new char[storage_size]; - WriteWrap* req_wrap = new (storage) WriteWrap(); + WriteWrap* req_wrap = new (storage) WriteWrap(wrap); uint32_t bytes = 0; size_t offset = sizeof(WriteWrap); @@ -513,11 +442,11 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) { bytes += str_size; } - int r = uv_write(&req_wrap->req_, - wrap->stream_, - bufs, - count, - StreamWrap::AfterWrite); + int r = wrap->callbacks_->DoWrite(req_wrap, + bufs, + count, + NULL, + StreamWrap::AfterWrite); // Deallocate space if (bufs != bufs_) @@ -526,20 +455,12 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) { req_wrap->Dispatched(); req_wrap->object_->Set(bytes_sym, Number::New(bytes)); - wrap->UpdateWriteQueueSize(); - if (r) { SetErrno(uv_last_error(uv_default_loop())); req_wrap->~WriteWrap(); delete[] storage; return scope.Close(v8::Null(node_isolate)); } else { - if (wrap->stream_->type == UV_TCP) { - NODE_COUNT_NET_BYTES_SENT(bytes); - } else if (wrap->stream_->type == UV_NAMED_PIPE) { - NODE_COUNT_PIPE_BYTES_SENT(bytes); - } - return scope.Close(req_wrap->object_); } } @@ -561,8 +482,8 @@ Handle<Value> StreamWrap::WriteUcs2String(const Arguments& args) { void StreamWrap::AfterWrite(uv_write_t* req, int status) { - WriteWrap* req_wrap = (WriteWrap*) req->data; - StreamWrap* wrap = (StreamWrap*) req->handle->data; + WriteWrap* req_wrap = container_of(req, WriteWrap, req_); + StreamWrap* wrap = req_wrap->wrap_; HandleScope scope(node_isolate); @@ -579,7 +500,7 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) { SetErrno(uv_last_error(uv_default_loop())); } - wrap->UpdateWriteQueueSize(); + wrap->callbacks_->AfterWrite(req_wrap); Local<Value> argv[] = { Integer::New(status, node_isolate), @@ -601,7 +522,7 @@ Handle<Value> StreamWrap::Shutdown(const Arguments& args) { ShutdownWrap* req_wrap = new ShutdownWrap(); - int r = uv_shutdown(&req_wrap->req_, wrap->stream_, AfterShutdown); + int r = wrap->callbacks_->DoShutdown(req_wrap, AfterShutdown); req_wrap->Dispatched(); @@ -641,4 +562,103 @@ void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { } +int StreamWrapCallbacks::DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle, + uv_write_cb cb) { + int r; + if (send_handle == NULL) { + r = uv_write(&w->req_, wrap_->stream_, bufs, count, cb); + } else { + r = uv_write2(&w->req_, wrap_->stream_, bufs, count, send_handle, cb); + } + + if (!r) { + size_t bytes = 0; + for (int i = 0; i < count; i++) + bytes += bufs[i].len; + if (wrap_->stream_->type == UV_TCP) { + NODE_COUNT_NET_BYTES_SENT(bytes); + } else if (wrap_->stream_->type == UV_NAMED_PIPE) { + NODE_COUNT_PIPE_BYTES_SENT(bytes); + } + } + + wrap_->UpdateWriteQueueSize(); + + return r; +} + + +void StreamWrapCallbacks::AfterWrite(WriteWrap* w) { + wrap_->UpdateWriteQueueSize(); +} + + +uv_buf_t StreamWrapCallbacks::DoAlloc(uv_handle_t* handle, + size_t suggested_size) { + char* buf = slab_allocator->Allocate(wrap_->object_, suggested_size); + return uv_buf_init(buf, suggested_size); +} + + +void StreamWrapCallbacks::DoRead(uv_stream_t* handle, + ssize_t nread, + uv_buf_t buf, + uv_handle_type pending) { + HandleScope scope(node_isolate); + + if (nread < 0) { + // If libuv reports an error or EOF it *may* give us a buffer back. In that + // case, return the space to the slab. + if (buf.base != NULL) + slab_allocator->Shrink(Self(), buf.base, 0); + + SetErrno(uv_last_error(uv_default_loop())); + MakeCallback(Self(), onread_sym, 0, NULL); + return; + } + + Local<Object> slab = slab_allocator->Shrink(wrap_->object_, buf.base, nread); + + if (nread == 0) return; + assert(static_cast<size_t>(nread) <= buf.len); + + int argc = 3; + Local<Value> argv[4] = { + slab, + Integer::NewFromUnsigned(buf.base - Buffer::Data(slab), node_isolate), + Integer::NewFromUnsigned(nread, node_isolate) + }; + + Local<Object> pending_obj; + if (pending == UV_TCP) { + pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle); + } else if (pending == UV_NAMED_PIPE) { + pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle); + } else if (pending == UV_UDP) { + pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle); + } else { + assert(pending == UV_UNKNOWN_HANDLE); + } + + if (!pending_obj.IsEmpty()) { + argv[3] = pending_obj; + argc++; + } + + MakeCallback(wrap_->object_, onread_sym, argc, argv); +} + + +int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) { + return uv_shutdown(&req_wrap->req_, wrap_->stream_, cb); +} + + +Handle<Object> StreamWrapCallbacks::Self() { + return wrap_->object_; +} + } diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 8b58c8af6..6b73efd68 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -25,18 +25,83 @@ #include "v8.h" #include "node.h" #include "handle_wrap.h" +#include "req_wrap.h" #include "string_bytes.h" namespace node { // Forward declaration -class WriteWrap; +class StreamWrap; +typedef class ReqWrap<uv_shutdown_t> ShutdownWrap; + +class WriteWrap: public ReqWrap<uv_write_t> { + public: + explicit WriteWrap(StreamWrap* wrap) { + wrap_ = wrap; + } + + void* operator new(size_t size, char* storage) { return storage; } + + // This is just to keep the compiler happy. It should never be called, since + // we don't use exceptions in node. + void operator delete(void* ptr, char* storage) { assert(0); } + + StreamWrap* wrap_; + + protected: + // People should not be using the non-placement new and delete operator on a + // WriteWrap. Ensure this never happens. + void* operator new(size_t size) { assert(0); }; + void operator delete(void* ptr) { assert(0); }; +}; + +// Overridable callbacks' types +class StreamWrapCallbacks { + public: + explicit StreamWrapCallbacks(StreamWrap* wrap) : wrap_(wrap) { + } + + explicit StreamWrapCallbacks(StreamWrapCallbacks* old) : wrap_(old->wrap_) { + } + + virtual ~StreamWrapCallbacks() { + } + + virtual int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle, + uv_write_cb cb); + virtual void AfterWrite(WriteWrap* w); + virtual uv_buf_t DoAlloc(uv_handle_t* handle, size_t suggested_size); + virtual void DoRead(uv_stream_t* handle, + ssize_t nread, + uv_buf_t buf, + uv_handle_type pending); + virtual int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb); + + v8::Handle<v8::Object> Self(); + + protected: + StreamWrap* wrap_; +}; class StreamWrap : public HandleWrap { public: uv_stream_t* GetStream() { return stream_; } + void OverrideCallbacks(StreamWrapCallbacks* callbacks) { + StreamWrapCallbacks* old = callbacks_; + callbacks_ = callbacks; + if (old != &default_callbacks_) + delete old; + } + + StreamWrapCallbacks* GetCallbacks() { + return callbacks_; + } + static void Initialize(v8::Handle<v8::Object> target); static v8::Handle<v8::Value> GetFD(v8::Local<v8::String>, @@ -53,10 +118,19 @@ class StreamWrap : public HandleWrap { static v8::Handle<v8::Value> WriteUtf8String(const v8::Arguments& args); static v8::Handle<v8::Value> WriteUcs2String(const v8::Arguments& args); + // Overridable callbacks + StreamWrapCallbacks* callbacks_; + protected: static size_t WriteBuffer(v8::Handle<v8::Value> val, uv_buf_t* buf); StreamWrap(v8::Handle<v8::Object> object, uv_stream_t* stream); + ~StreamWrap() { + if (callbacks_ != &default_callbacks_) { + delete callbacks_; + callbacks_ = NULL; + } + } void StateChange() { } void UpdateWriteQueueSize(); @@ -79,6 +153,9 @@ class StreamWrap : public HandleWrap { size_t slab_offset_; uv_stream_t* stream_; + + StreamWrapCallbacks default_callbacks_; + friend class StreamWrapCallbacks; }; |