diff options
author | Timothy J Fontaine <tjfontaine@gmail.com> | 2014-03-10 17:01:21 -0700 |
---|---|---|
committer | Timothy J Fontaine <tjfontaine@gmail.com> | 2014-03-10 17:01:21 -0700 |
commit | e92d35d80be6e193cb547e94c6fbf3654542dbaa (patch) | |
tree | bede6f090b8cca1397728634b03b31cfa7a4334c /deps/uv/src/unix/stream.c | |
parent | b444392a98e66b49dfee8c7e36c59d4e7c6ea1ac (diff) | |
download | node-new-e92d35d80be6e193cb547e94c6fbf3654542dbaa.tar.gz |
uv: Upgrade to v0.11.22
Diffstat (limited to 'deps/uv/src/unix/stream.c')
-rw-r--r-- | deps/uv/src/unix/stream.c | 290 |
1 files changed, 166 insertions, 124 deletions
diff --git a/deps/uv/src/unix/stream.c b/deps/uv/src/unix/stream.c index ad6856b4b7..370894bfd6 100644 --- a/deps/uv/src/unix/stream.c +++ b/deps/uv/src/unix/stream.c @@ -63,36 +63,6 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); static size_t uv__write_req_size(uv_write_t* req); -/* Used by the accept() EMFILE party trick. */ -static int uv__open_cloexec(const char* path, int flags) { - int err; - int fd; - -#if defined(__linux__) - fd = open(path, flags | UV__O_CLOEXEC); - if (fd != -1) - return fd; - - if (errno != EINVAL) - return -errno; - - /* O_CLOEXEC not supported. */ -#endif - - fd = open(path, flags); - if (fd == -1) - return -errno; - - err = uv__cloexec(fd, 1); - if (err) { - uv__close(fd); - return err; - } - - return fd; -} - - static size_t uv_count_bufs(const uv_buf_t bufs[], unsigned int nbufs) { unsigned int i; size_t bytes; @@ -112,13 +82,13 @@ void uv__stream_init(uv_loop_t* loop, uv__handle_init(loop, (uv_handle_t*)stream, type); stream->read_cb = NULL; - stream->read2_cb = NULL; stream->alloc_cb = NULL; stream->close_cb = NULL; stream->connection_cb = NULL; stream->connect_req = NULL; stream->shutdown_req = NULL; stream->accepted_fd = -1; + stream->queued_fds = NULL; stream->delayed_error = 0; QUEUE_INIT(&stream->write_queue); QUEUE_INIT(&stream->write_completed_queue); @@ -570,6 +540,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { if (server->accepted_fd == -1) return -EAGAIN; + err = 0; switch (client->type) { case UV_NAMED_PIPE: case UV_TCP: @@ -579,8 +550,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { if (err) { /* TODO handle error */ uv__close(server->accepted_fd); - server->accepted_fd = -1; - return err; + goto done; } break; @@ -588,8 +558,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); if (err) { uv__close(server->accepted_fd); - server->accepted_fd = -1; - return err; + goto done; } break; @@ -597,9 +566,33 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { assert(0); } - uv__io_start(server->loop, &server->io_watcher, UV__POLLIN); - server->accepted_fd = -1; - return 0; +done: + /* Process queued fds */ + if (server->queued_fds != NULL) { + uv__stream_queued_fds_t* queued_fds; + + queued_fds = server->queued_fds; + + /* Read first */ + server->accepted_fd = queued_fds->fds[0]; + + /* All read, free */ + assert(queued_fds->offset > 0); + if (--queued_fds->offset == 0) { + free(queued_fds); + server->queued_fds = NULL; + } else { + /* Shift rest */ + memmove(queued_fds->fds, + queued_fds->fds + 1, + queued_fds->offset * sizeof(*queued_fds->fds)); + } + } else { + server->accepted_fd = -1; + if (err == 0) + uv__io_start(server->loop, &server->io_watcher, UV__POLLIN); + } + return err; } @@ -777,12 +770,12 @@ start: msg.msg_flags = 0; msg.msg_control = (void*) scratch; - msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send)); cmsg = CMSG_FIRSTHDR(&msg); cmsg->cmsg_level = SOL_SOCKET; cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = msg.msg_controllen; + cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send)); /* silence aliasing warning */ { @@ -913,7 +906,7 @@ static void uv__write_callbacks(uv_stream_t* stream) { } -static uv_handle_type uv__handle_type(int fd) { +uv_handle_type uv__handle_type(int fd) { struct sockaddr_storage ss; socklen_t len; int type; @@ -947,24 +940,106 @@ static uv_handle_type uv__handle_type(int fd) { } -static void uv__stream_read_cb(uv_stream_t* stream, - int status, - const uv_buf_t* buf, - uv_handle_type type) { - if (stream->read_cb != NULL) - stream->read_cb(stream, status, buf); - else - stream->read2_cb((uv_pipe_t*) stream, status, buf, type); -} - - static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { stream->flags |= UV_STREAM_READ_EOF; uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); if (!uv__io_active(&stream->io_watcher, UV__POLLOUT)) uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); - uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE); + stream->read_cb(stream, UV_EOF, buf); +} + + +static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { + uv__stream_queued_fds_t* queued_fds; + unsigned int queue_size; + + queued_fds = stream->queued_fds; + if (queued_fds == NULL) { + queue_size = 8; + queued_fds = malloc((queue_size - 1) * sizeof(*queued_fds->fds) + + sizeof(*queued_fds)); + if (queued_fds == NULL) + return -ENOMEM; + queued_fds->size = queue_size; + queued_fds->offset = 0; + stream->queued_fds = queued_fds; + + /* Grow */ + } else if (queued_fds->size == queued_fds->offset) { + queue_size = queued_fds->size + 8; + queued_fds = realloc(queued_fds, + (queue_size - 1) * sizeof(*queued_fds->fds) + + sizeof(*queued_fds)); + + /* + * Allocation failure, report back. + * NOTE: if it is fatal - sockets will be closed in uv__stream_close + */ + if (queued_fds == NULL) + return -ENOMEM; + queued_fds->size = queue_size; + stream->queued_fds = queued_fds; + } + + /* Put fd in a queue */ + queued_fds->fds[queued_fds->offset++] = fd; + + return 0; +} + + +#define UV__CMSG_FD_COUNT 64 +#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int)) + + +static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { + struct cmsghdr* cmsg; + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { + char* start; + char* end; + int err; + void* pv; + int* pi; + unsigned int i; + unsigned int count; + + if (cmsg->cmsg_type != SCM_RIGHTS) { + fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", + cmsg->cmsg_type); + continue; + } + + /* silence aliasing warning */ + pv = CMSG_DATA(cmsg); + pi = pv; + + /* Count available fds */ + start = (char*) cmsg; + end = (char*) cmsg + cmsg->cmsg_len; + count = 0; + while (start + CMSG_LEN(count * sizeof(*pi)) < end) + count++; + assert(start + CMSG_LEN(count * sizeof(*pi)) == end); + + for (i = 0; i < count; i++) { + /* Already has accepted fd, queue now */ + if (stream->accepted_fd != -1) { + err = uv__stream_queue_fd(stream, pi[i]); + if (err != 0) { + /* Close rest */ + for (; i < count; i++) + uv__close(pi[i]); + return err; + } + } else { + stream->accepted_fd = pi[i]; + } + } + } + + return 0; } @@ -972,9 +1047,10 @@ static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; struct msghdr msg; - struct cmsghdr* cmsg; - char cmsg_space[64]; + char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)]; int count; + int err; + int is_ipc; stream->flags &= ~UV_STREAM_READ_PARTIAL; @@ -983,10 +1059,12 @@ static void uv__read(uv_stream_t* stream) { */ count = 32; + is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; + /* XXX: Maybe instead of having UV_STREAM_READING we just test if * tcp->read_cb is NULL or not? */ - while ((stream->read_cb || stream->read2_cb) + while (stream->read_cb && (stream->flags & UV_STREAM_READING) && (count-- > 0)) { assert(stream->alloc_cb != NULL); @@ -994,29 +1072,28 @@ static void uv__read(uv_stream_t* stream) { stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); if (buf.len == 0) { /* User indicates it can't or won't handle the read. */ - uv__stream_read_cb(stream, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE); + stream->read_cb(stream, UV_ENOBUFS, &buf); return; } assert(buf.base != NULL); assert(uv__stream_fd(stream) >= 0); - if (stream->read_cb) { + if (!is_ipc) { do { nread = read(uv__stream_fd(stream), buf.base, buf.len); } while (nread < 0 && errno == EINTR); } else { - assert(stream->read2_cb); - /* read2_cb uses recvmsg */ + /* ipc uses recvmsg */ msg.msg_flags = 0; msg.msg_iov = (struct iovec*) &buf; msg.msg_iovlen = 1; msg.msg_name = NULL; msg.msg_namelen = 0; /* Set up to receive a descriptor even if one isn't in the message */ - msg.msg_controllen = 64; - msg.msg_control = (void*) cmsg_space; + msg.msg_controllen = sizeof(cmsg_space); + msg.msg_control = cmsg_space; do { nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); @@ -1032,10 +1109,10 @@ static void uv__read(uv_stream_t* stream) { uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN); uv__stream_osx_interrupt_select(stream); } - uv__stream_read_cb(stream, 0, &buf, UV_UNKNOWN_HANDLE); + stream->read_cb(stream, 0, &buf); } else { /* Error. User should call uv_close(). */ - uv__stream_read_cb(stream, -errno, &buf, UV_UNKNOWN_HANDLE); + stream->read_cb(stream, -errno, &buf); assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) && "stream->read_cb(status=-1) did not call uv_close()"); } @@ -1047,50 +1124,14 @@ static void uv__read(uv_stream_t* stream) { /* Successful read */ ssize_t buflen = buf.len; - if (stream->read_cb) { - stream->read_cb(stream, nread, &buf); - } else { - assert(stream->read2_cb); - - /* - * XXX: Some implementations can send multiple file descriptors in a - * single message. We should be using CMSG_NXTHDR() to walk the - * chain to get at them all. This would require changing the API to - * hand these back up the caller, is a pain. - */ - - for (cmsg = CMSG_FIRSTHDR(&msg); - msg.msg_controllen > 0 && cmsg != NULL; - cmsg = CMSG_NXTHDR(&msg, cmsg)) { - - if (cmsg->cmsg_type == SCM_RIGHTS) { - if (stream->accepted_fd != -1) { - fprintf(stderr, "(libuv) ignoring extra FD received\n"); - } - - /* silence aliasing warning */ - { - void* pv = CMSG_DATA(cmsg); - int* pi = pv; - stream->accepted_fd = *pi; - } - - } else { - fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", - cmsg->cmsg_type); - } - } - - - if (stream->accepted_fd >= 0) { - stream->read2_cb((uv_pipe_t*) stream, - nread, - &buf, - uv__handle_type(stream->accepted_fd)); - } else { - stream->read2_cb((uv_pipe_t*) stream, nread, &buf, UV_UNKNOWN_HANDLE); + if (is_ipc) { + err = uv__stream_recv_cmsg(stream, &msg); + if (err != 0) { + stream->read_cb(stream, err, NULL); + return; } } + stream->read_cb(stream, nread, &buf); /* Return if we didn't fill the buffer, there is no more data to read. */ if (nread < buflen) { @@ -1102,6 +1143,10 @@ static void uv__read(uv_stream_t* stream) { } +#undef UV__CMSG_FD_COUNT +#undef UV__CMSG_FD_SIZE + + int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) && "uv_shutdown (unix) only supports uv_handle_t right now"); @@ -1371,10 +1416,9 @@ int uv_try_write(uv_stream_t* stream, } -static int uv__read_start_common(uv_stream_t* stream, - uv_alloc_cb alloc_cb, - uv_read_cb read_cb, - uv_read2_cb read2_cb) { +int uv_read_start(uv_stream_t* stream, + uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); @@ -1394,7 +1438,6 @@ static int uv__read_start_common(uv_stream_t* stream, assert(alloc_cb); stream->read_cb = read_cb; - stream->read2_cb = read2_cb; stream->alloc_cb = alloc_cb; uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN); @@ -1405,18 +1448,6 @@ static int uv__read_start_common(uv_stream_t* stream, } -int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { - return uv__read_start_common(stream, alloc_cb, read_cb, NULL); -} - - -int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, - uv_read2_cb read_cb) { - return uv__read_start_common(stream, alloc_cb, NULL, read_cb); -} - - int uv_read_stop(uv_stream_t* stream) { /* Sanity check. We're going to stop the handle unless it's primed for * writing but that means there should be some kind of write action in @@ -1435,7 +1466,6 @@ int uv_read_stop(uv_stream_t* stream) { uv__stream_osx_interrupt_select(stream); stream->read_cb = NULL; - stream->read2_cb = NULL; stream->alloc_cb = NULL; return 0; } @@ -1469,6 +1499,9 @@ int uv___stream_fd(uv_stream_t* handle) { void uv__stream_close(uv_stream_t* handle) { + unsigned int i; + uv__stream_queued_fds_t* queued_fds; + #if defined(__APPLE__) /* Terminate select loop first */ if (handle->select != NULL) { @@ -1506,6 +1539,15 @@ void uv__stream_close(uv_stream_t* handle) { handle->accepted_fd = -1; } + /* Close all queued fds */ + if (handle->queued_fds != NULL) { + queued_fds = handle->queued_fds; + for (i = 0; i < queued_fds->offset; i++) + uv__close(queued_fds->fds[i]); + free(handle->queued_fds); + handle->queued_fds = NULL; + } + assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT)); } |