diff options
author | max ulidtko <ulidtko@gmail.com> | 2020-05-19 21:44:46 +0300 |
---|---|---|
committer | Jens Geyer <jensg@apache.org> | 2020-06-09 23:09:25 +0200 |
commit | dabfea2f98a9ab605748dea55b1efbb1932b361f (patch) | |
tree | ac141f9f985ad0aa540b9b6111b8c9ffcfb39728 /lib | |
parent | cfbb905034c928f073639af00b30d74951744b61 (diff) | |
download | thrift-dabfea2f98a9ab605748dea55b1efbb1932b361f.tar.gz |
THRIFT-5186: Rewrite address resolution in T{Nonblocking,}ServerSocket
Client: cpp
Patch: Max Ulidtko
This closes #2151
Diffstat (limited to 'lib')
-rwxr-xr-x | lib/cpp/Makefile.am | 1 | ||||
-rw-r--r-- | lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp | 223 | ||||
-rw-r--r-- | lib/cpp/src/thrift/transport/TNonblockingServerSocket.h | 5 | ||||
-rw-r--r-- | lib/cpp/src/thrift/transport/TServerSocket.cpp | 306 | ||||
-rw-r--r-- | lib/cpp/src/thrift/transport/TServerSocket.h | 19 | ||||
-rw-r--r-- | lib/cpp/src/thrift/transport/TSocketUtils.h | 163 |
6 files changed, 451 insertions, 266 deletions
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index c73a17d75..a536d1719 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -189,6 +189,7 @@ include_transport_HEADERS = \ src/thrift/transport/THttpClient.h \ src/thrift/transport/THttpServer.h \ src/thrift/transport/TSocket.h \ + src/thrift/transport/TSocketUtils.h \ src/thrift/transport/TPipe.h \ src/thrift/transport/TPipeServer.h \ src/thrift/transport/TSSLSocket.h \ diff --git a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp index c50ce381c..7bac37eb2 100644 --- a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp +++ b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp @@ -44,9 +44,10 @@ #include <unistd.h> #endif -#include <thrift/transport/TSocket.h> -#include <thrift/transport/TNonblockingServerSocket.h> #include <thrift/transport/PlatformSocket.h> +#include <thrift/transport/TNonblockingServerSocket.h> +#include <thrift/transport/TSocket.h> +#include <thrift/transport/TSocketUtils.h> #ifndef AF_LOCAL #define AF_LOCAL AF_UNIX @@ -74,8 +75,8 @@ namespace apache { namespace thrift { namespace transport { -using std::string; using std::shared_ptr; +using std::string; TNonblockingServerSocket::TNonblockingServerSocket(int port) : port_(port), @@ -171,60 +172,7 @@ void TNonblockingServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) { tcpRecvBuffer_ = tcpRecvBuffer; } -void TNonblockingServerSocket::listen() { - listening_ = true; -#ifdef _WIN32 - TWinsockSingleton::create(); -#endif // _WIN32 - - // Validate port number - if (port_ < 0 || port_ > 0xFFFF) { - throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid"); - } - - const struct addrinfo *res; - int error; - char port[sizeof("65535")]; - THRIFT_SNPRINTF(port, sizeof(port), "%d", port_); - - struct addrinfo hints; - std::memset(&hints, 0, sizeof(hints)); - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - - // If address is not specified use wildcard address (NULL) - TGetAddrInfoWrapper info(address_.empty() ? nullptr : &address_[0], port, &hints); - - error = info.init(); - if (error) { - GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error)); - close(); - throw TTransportException(TTransportException::NOT_OPEN, - "Could not resolve host for server socket."); - } - - // Pick the ipv6 address first since ipv4 addresses can be mapped - // into ipv6 space. - for (res = info.res(); res; res = res->ai_next) { - if (res->ai_family == AF_INET6 || res->ai_next == nullptr) - break; - } - - if (!path_.empty()) { - serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP); - } else if (res != nullptr) { - serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - } - - if (serverSocket_ == THRIFT_INVALID_SOCKET) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TNonblockingServerSocket::listen() socket() ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, - "Could not create server socket.", - errno_copy); - } +void TNonblockingServerSocket::_setup_sockopts() { // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept int one = 1; @@ -278,19 +226,6 @@ void TNonblockingServerSocket::listen() { } } -#ifdef IPV6_V6ONLY - if (res->ai_family == AF_INET6 && path_.empty()) { - int zero = 0; - if (-1 == setsockopt(serverSocket_, - IPPROTO_IPV6, - IPV6_V6ONLY, - cast_sockopt(&zero), - sizeof(zero))) { - GlobalOutput.perror("TNonblockingServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR); - } - } -#endif // #ifdef IPV6_V6ONLY - // Turn linger off, don't want to block on calls to close struct linger ling = {0, 0}; if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) { @@ -310,24 +245,6 @@ void TNonblockingServerSocket::listen() { errno_copy); } - // Set TCP nodelay if available, MAC OS X Hack - // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html -#ifndef TCP_NOPUSH - // Unix Sockets do not need that - if (path_.empty()) { - // TCP Nodelay, speed over bandwidth - if (-1 - == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, - "Could not set TCP_NODELAY", - errno_copy); - } - } -#endif - // Set NONBLOCK on the accept socket int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0); if (flags == -1) { @@ -348,6 +265,26 @@ void TNonblockingServerSocket::listen() { errno_copy); } +} // _setup_sockopts() + +void TNonblockingServerSocket::_setup_tcp_sockopts() { + int one = 1; + + // Set TCP nodelay if available, MAC OS X Hack + // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html +#ifndef TCP_NOPUSH + // TCP Nodelay, speed over bandwidth + if (-1 + == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy); + close(); + throw TTransportException(TTransportException::NOT_OPEN, + "Could not set TCP_NODELAY", + errno_copy); + } +#endif + #ifdef TCP_LOW_MIN_RTO if (TSocket::getUseLowMinRto()) { if (-1 == setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one))) { @@ -361,17 +298,60 @@ void TNonblockingServerSocket::listen() { } #endif - // prepare the port information +} // _setup_tcp_sockopts() + +void TNonblockingServerSocket::listen() { + listening_ = true; +#ifdef _WIN32 + TWinsockSingleton::create(); +#endif // _WIN32 + + // tcp == false means Unix Domain socket + bool tcp = (path_.empty()); + + // Validate port number + if (port_ < 0 || port_ > 0xFFFF) { + throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid"); + } + + // Resolve host:port strings into an iterable of struct addrinfo* + AddressResolutionHelper resolved_addresses; + if (tcp) { + try { + resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM, + AI_PASSIVE | AI_V4MAPPED); + } catch (const std::system_error& e) { + GlobalOutput.printf("getaddrinfo() -> %d. %s", e.code().value(), e.what()); + close(); + throw TTransportException(TTransportException::NOT_OPEN, + "Could not resolve host for server socket."); + } + } + // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't // always seem to work. The client can configure the retry variables. int retries = 0; int errno_copy = 0; - if (!path_.empty()) { + if (!tcp) { + // -- Unix Domain Socket -- // + + serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP); + + if (serverSocket_ == THRIFT_INVALID_SOCKET) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy); + close(); + throw TTransportException(TTransportException::NOT_OPEN, + "Could not create server socket.", + errno_copy); + } + + _setup_sockopts(); + //_setup_unixdomain_sockopts(); #ifndef _WIN32 - // Unix Domain Socket size_t len = path_.size() + 1; if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) { errno_copy = THRIFT_GET_SOCKET_ERROR; @@ -411,11 +391,48 @@ void TNonblockingServerSocket::listen() { " Unix Domain socket path not supported"); #endif } else { + + // -- TCP socket -- // + + auto addr_iter = AddressResolutionHelper::Iter{}; + + // Via DNS or somehow else, single hostname can resolve into many addresses. + // Results may contain perhaps a mix of IPv4 and IPv6. Here, we iterate + // over what system gave us, picking the first address that works. do { - if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen))) { + if (!addr_iter) { + // init + recycle over many retries + addr_iter = resolved_addresses.iterate(); + } + auto trybind = *addr_iter++; + + serverSocket_ = socket(trybind->ai_family, trybind->ai_socktype, trybind->ai_protocol); + if (serverSocket_ == -1) { + errno_copy = THRIFT_GET_SOCKET_ERROR; + continue; + } + + _setup_sockopts(); + _setup_tcp_sockopts(); + +#ifdef IPV6_V6ONLY + if (trybind->ai_family == AF_INET6) { + int zero = 0; + if (-1 == setsockopt(serverSocket_, + IPPROTO_IPV6, + IPV6_V6ONLY, + cast_sockopt(&zero), + sizeof(zero))) { + GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR); + } + } +#endif // #ifdef IPV6_V6ONLY + + if (0 == ::bind(serverSocket_, trybind->ai_addr, static_cast<int>(trybind->ai_addrlen))) { break; } errno_copy = THRIFT_GET_SOCKET_ERROR; + // use short circuit evaluation here to only sleep if we need to } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0)); @@ -437,12 +454,21 @@ void TNonblockingServerSocket::listen() { } } } + } // TCP socket // + + // throw error if socket still wasn't created successfully + if (serverSocket_ == THRIFT_INVALID_SOCKET) { + GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy); + close(); + throw TTransportException(TTransportException::NOT_OPEN, + "Could not create server socket.", + errno_copy); } // throw an error if we failed to bind properly if (retries > retryLimit_) { char errbuf[1024]; - if (!path_.empty()) { + if (!tcp) { THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() PATH %s", path_.c_str()); } else { THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() BIND %d", port_); @@ -478,9 +504,10 @@ int TNonblockingServerSocket::getListenPort() { shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() { if (serverSocket_ == THRIFT_INVALID_SOCKET) { - throw TTransportException(TTransportException::NOT_OPEN, "TNonblockingServerSocket not listening"); + throw TTransportException(TTransportException::NOT_OPEN, + "TNonblockingServerSocket not listening"); } - + struct sockaddr_storage clientAddress; int size = sizeof(clientAddress); THRIFT_SOCKET clientSocket @@ -544,6 +571,6 @@ void TNonblockingServerSocket::close() { serverSocket_ = THRIFT_INVALID_SOCKET; listening_ = false; } -} -} -} // apache::thrift::transport +} // namespace transport +} // namespace thrift +} // namespace apache diff --git a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h index a68c28d22..1ed2b07f9 100644 --- a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h +++ b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h @@ -100,7 +100,7 @@ public: THRIFT_SOCKET getSocketFD() override { return serverSocket_; } int getPort() override; - + int getListenPort() override; void listen() override; @@ -128,6 +128,9 @@ private: socket_func_t listenCallback_; socket_func_t acceptCallback_; + + void _setup_sockopts(); + void _setup_tcp_sockopts(); }; } } diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp index 150e53096..6b7652560 100644 --- a/lib/cpp/src/thrift/transport/TServerSocket.cpp +++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp @@ -44,9 +44,10 @@ #include <unistd.h> #endif -#include <thrift/transport/TSocket.h> -#include <thrift/transport/TServerSocket.h> #include <thrift/transport/PlatformSocket.h> +#include <thrift/transport/TServerSocket.h> +#include <thrift/transport/TSocket.h> +#include <thrift/transport/TSocketUtils.h> #ifndef AF_LOCAL #define AF_LOCAL AF_UNIX @@ -83,26 +84,6 @@ namespace transport { using std::shared_ptr; -TGetAddrInfoWrapper::TGetAddrInfoWrapper(const char* node, - const char* service, - const struct addrinfo* hints) - : node_(node), service_(service), hints_(hints), res_(nullptr) {} - -TGetAddrInfoWrapper::~TGetAddrInfoWrapper() { - if (this->res_ != nullptr) - freeaddrinfo(this->res_); -} - -int TGetAddrInfoWrapper::init() { - if (this->res_ == nullptr) - return getaddrinfo(this->node_, this->service_, this->hints_, &(this->res_)); - return 0; -} - -const struct addrinfo* TGetAddrInfoWrapper::res() { - return this->res_; -} - TServerSocket::TServerSocket(int port) : interruptableChildren_(true), port_(port), @@ -228,84 +209,7 @@ void TServerSocket::setInterruptableChildren(bool enable) { interruptableChildren_ = enable; } -void TServerSocket::listen() { - listening_ = true; -#ifdef _WIN32 - TWinsockSingleton::create(); -#endif // _WIN32 - THRIFT_SOCKET sv[2]; - // Create the socket pair used to interrupt - if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) { - GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt", THRIFT_GET_SOCKET_ERROR); - interruptSockWriter_ = THRIFT_INVALID_SOCKET; - interruptSockReader_ = THRIFT_INVALID_SOCKET; - } else { - interruptSockWriter_ = sv[1]; - interruptSockReader_ = sv[0]; - } - - // Create the socket pair used to interrupt all clients - if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) { - GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt", - THRIFT_GET_SOCKET_ERROR); - childInterruptSockWriter_ = THRIFT_INVALID_SOCKET; - pChildInterruptSockReader_.reset(); - } else { - childInterruptSockWriter_ = sv[1]; - pChildInterruptSockReader_ - = std::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets); - } - - // Validate port number - if (port_ < 0 || port_ > 0xFFFF) { - throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid"); - } - - const struct addrinfo *res = nullptr; - int error; - char port[sizeof("65535")]; - THRIFT_SNPRINTF(port, sizeof(port), "%d", port_); - - struct addrinfo hints; - std::memset(&hints, 0, sizeof(hints)); - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - - // If address is not specified use wildcard address (NULL) - TGetAddrInfoWrapper info(address_.empty() ? nullptr : &address_[0], port, &hints); - - if (path_.empty()) { - error = info.init(); - if (error) { - GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error)); - close(); - throw TTransportException(TTransportException::NOT_OPEN, - "Could not resolve host for server socket."); - } - - // Pick the ipv6 address first since ipv4 addresses can be mapped - // into ipv6 space. - for (res = info.res(); res; res = res->ai_next) { - if (res->ai_family == AF_INET6 || res->ai_next == nullptr) - break; - } - } - - if (!path_.empty()) { - serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP); - } else if (res != nullptr) { - serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - } - - if (serverSocket_ == THRIFT_INVALID_SOCKET) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, - "Could not create server socket.", - errno_copy); - } +void TServerSocket::_setup_sockopts() { // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept int one = 1; @@ -359,33 +263,6 @@ void TServerSocket::listen() { } } -// Defer accept -#ifdef TCP_DEFER_ACCEPT - if (path_.empty()) { - if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_DEFER_ACCEPT, &one, sizeof(one))) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, - "Could not set TCP_DEFER_ACCEPT", - errno_copy); - } - } -#endif // #ifdef TCP_DEFER_ACCEPT - -#ifdef IPV6_V6ONLY - if (path_.empty() && res->ai_family == AF_INET6) { - int zero = 0; - if (-1 == setsockopt(serverSocket_, - IPPROTO_IPV6, - IPV6_V6ONLY, - cast_sockopt(&zero), - sizeof(zero))) { - GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR); - } - } -#endif // #ifdef IPV6_V6ONLY - // Turn linger off, don't want to block on calls to close struct linger ling = {0, 0}; if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) { @@ -395,20 +272,6 @@ void TServerSocket::listen() { throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy); } - // Unix Sockets do not need that - if (path_.empty()) { - // TCP Nodelay, speed over bandwidth - if (-1 - == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, - "Could not set TCP_NODELAY", - errno_copy); - } - } - // Set NONBLOCK on the accept socket int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0); if (flags == -1) { @@ -419,7 +282,6 @@ void TServerSocket::listen() { "THRIFT_FCNTL() THRIFT_F_GETFL failed", errno_copy); } - if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) { int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy); @@ -428,18 +290,114 @@ void TServerSocket::listen() { "THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed", errno_copy); } +} + +void TServerSocket::_setup_unixdomain_sockopts() { +} + +void TServerSocket::_setup_tcp_sockopts() { + int one = 1; + + // Defer accept +#ifdef TCP_DEFER_ACCEPT + if (path_.empty()) { + if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_DEFER_ACCEPT, &one, sizeof(one))) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy); + close(); + throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", + errno_copy); + } + } +#endif // #ifdef TCP_DEFER_ACCEPT + + // TCP Nodelay, speed over bandwidth + if (-1 + == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy); + close(); + throw TTransportException(TTransportException::NOT_OPEN, + "Could not set TCP_NODELAY", + errno_copy); + } +} + +void TServerSocket::listen() { + listening_ = true; +#ifdef _WIN32 + TWinsockSingleton::create(); +#endif // _WIN32 + + THRIFT_SOCKET sv[2]; + // Create the socket pair used to interrupt + if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) { + GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt", + THRIFT_GET_SOCKET_ERROR); + interruptSockWriter_ = THRIFT_INVALID_SOCKET; + interruptSockReader_ = THRIFT_INVALID_SOCKET; + } else { + interruptSockWriter_ = sv[1]; + interruptSockReader_ = sv[0]; + } + + // Create the socket pair used to interrupt all clients + if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) { + GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt", + THRIFT_GET_SOCKET_ERROR); + childInterruptSockWriter_ = THRIFT_INVALID_SOCKET; + pChildInterruptSockReader_.reset(); + } else { + childInterruptSockWriter_ = sv[1]; + pChildInterruptSockReader_ + = std::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets); + } + + // tcp == false means Unix Domain socket + bool tcp = (path_.empty()); + + // Validate port number + if (port_ < 0 || port_ > 0xFFFF) { + throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid"); + } + + // Resolve host:port strings into an iterable of struct addrinfo* + AddressResolutionHelper resolved_addresses; + if (tcp) { + try { + resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM, + AI_PASSIVE | AI_V4MAPPED); + } catch (const std::system_error& e) { + GlobalOutput.printf("getaddrinfo() -> %d; %s", e.code().value(), e.what()); + close(); + throw TTransportException(TTransportException::NOT_OPEN, + "Could not resolve host for server socket."); + } + } - // prepare the port information // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't // always seem to work. The client can configure the retry variables. int retries = 0; int errno_copy = 0; - if (!path_.empty()) { + if (!tcp) { + // -- Unix Domain Socket -- // -#ifndef _WIN32 + serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP); + + if (serverSocket_ == THRIFT_INVALID_SOCKET) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy); + close(); + throw TTransportException(TTransportException::NOT_OPEN, + "Could not create server socket.", + errno_copy); + } - // Unix Domain Socket + _setup_sockopts(); + _setup_unixdomain_sockopts(); + +#ifndef _WIN32 size_t len = path_.size() + 1; if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) { errno_copy = THRIFT_GET_SOCKET_ERROR; @@ -479,11 +437,48 @@ void TServerSocket::listen() { " Unix Domain socket path not supported"); #endif } else { + + // -- TCP socket -- // + + auto addr_iter = AddressResolutionHelper::Iter{}; + + // Via DNS or somehow else, single hostname can resolve into many addresses. + // Results may contain perhaps a mix of IPv4 and IPv6. Here, we iterate + // over what system gave us, picking the first address that works. do { - if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen))) { + if (!addr_iter) { + // init + recycle over many retries + addr_iter = resolved_addresses.iterate(); + } + auto trybind = *addr_iter++; + + serverSocket_ = socket(trybind->ai_family, trybind->ai_socktype, trybind->ai_protocol); + if (serverSocket_ == -1) { + errno_copy = THRIFT_GET_SOCKET_ERROR; + continue; + } + + _setup_sockopts(); + _setup_tcp_sockopts(); + +#ifdef IPV6_V6ONLY + if (trybind->ai_family == AF_INET6) { + int zero = 0; + if (-1 == setsockopt(serverSocket_, + IPPROTO_IPV6, + IPV6_V6ONLY, + cast_sockopt(&zero), + sizeof(zero))) { + GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR); + } + } +#endif // #ifdef IPV6_V6ONLY + + if (0 == ::bind(serverSocket_, trybind->ai_addr, static_cast<int>(trybind->ai_addrlen))) { break; } errno_copy = THRIFT_GET_SOCKET_ERROR; + // use short circuit evaluation here to only sleep if we need to } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0)); @@ -505,12 +500,21 @@ void TServerSocket::listen() { } } } + } // TCP socket // + + // throw error if socket still wasn't created successfully + if (serverSocket_ == THRIFT_INVALID_SOCKET) { + GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy); + close(); + throw TTransportException(TTransportException::NOT_OPEN, + "Could not create server socket.", + errno_copy); } // throw an error if we failed to bind properly if (retries > retryLimit_) { char errbuf[1024]; - if (!path_.empty()) { + if (!tcp) { THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TServerSocket::listen() PATH %s", path_.c_str()); } else { THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TServerSocket::listen() BIND %d", port_); @@ -699,6 +703,6 @@ void TServerSocket::close() { pChildInterruptSockReader_.reset(); listening_ = false; } -} -} -} // apache::thrift::transport +} // namespace transport +} // namespace thrift +} // namespace apache diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h index 4562341b0..e4659a036 100644 --- a/lib/cpp/src/thrift/transport/TServerSocket.h +++ b/lib/cpp/src/thrift/transport/TServerSocket.h @@ -38,22 +38,6 @@ namespace transport { class TSocket; -class TGetAddrInfoWrapper { -public: - TGetAddrInfoWrapper(const char* node, const char* service, const struct addrinfo* hints); - - virtual ~TGetAddrInfoWrapper(); - - int init(); - const struct addrinfo* res(); - -private: - const char* node_; - const char* service_; - const struct addrinfo* hints_; - struct addrinfo* res_; -}; - /** * Server socket implementation of TServerTransport. Wrapper around a unix * socket listen and accept calls. @@ -156,6 +140,9 @@ protected: private: void notify(THRIFT_SOCKET notifySock); + void _setup_sockopts(); + void _setup_unixdomain_sockopts(); + void _setup_tcp_sockopts(); int port_; std::string address_; diff --git a/lib/cpp/src/thrift/transport/TSocketUtils.h b/lib/cpp/src/thrift/transport/TSocketUtils.h new file mode 100644 index 000000000..c9e0e57b8 --- /dev/null +++ b/lib/cpp/src/thrift/transport/TSocketUtils.h @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TRANSPORT_SOCKETUTILS_H_ +#define _THRIFT_TRANSPORT_SOCKETUTILS_H_ 1 + +#include <memory> +#include <string> +#include <system_error> +#include <vector> + +#include <sys/types.h> +#ifdef HAVE_SYS_SOCKET_H +#include <sys/socket.h> +#endif +#ifdef HAVE_NETDB_H +#include <netdb.h> +#endif + +#include <thrift/transport/PlatformSocket.h> + +namespace apache { +namespace thrift { + +/** + * A helper to resolve hostnames to struct addrinfo's -- and not leak memory. + * + * Use like this: + * + * apache::thrift::AddressResolutionHelper addresses("localhost", "80"); + * + * for (auto addr : addresses.iterate()) { + * connect(sock, addr->ai_addr, addr->ai_addrlen); + * // ... + * } + */ +struct AddressResolutionHelper { + +private: + struct addrinfo_deleter { + void operator()(addrinfo* addr) { + ::freeaddrinfo(addr); // frees the whole list + } + }; + +public: + using PtrOwnedList = std::unique_ptr<addrinfo, addrinfo_deleter>; + + struct Iter : std::iterator<std::forward_iterator_tag, const addrinfo*> { + value_type ptr = nullptr; + + Iter() = default; + Iter(const addrinfo* head) : ptr(head) {} + + value_type operator*() const { return ptr; } + + bool operator==(const Iter& other) { return this->ptr == other.ptr; } + bool operator!=(const Iter& other) { return this->ptr != other.ptr; } + + operator bool() { return this->ptr != nullptr; } + bool operator!() { return this->ptr == nullptr; } + + Iter& operator++() { + if (ptr == nullptr) { + throw std::out_of_range("won't go pass end of linked list"); + } + ptr = ptr->ai_next; + return *this; + } + Iter operator++(int) { + Iter tmp(*this); + ++(*this); + return tmp; + } + }; + + struct gai_error : std::error_category { + virtual const char* name() const noexcept override { return "getaddrinfo"; } + virtual std::string message(int code) const override { return THRIFT_GAI_STRERROR(code); } + }; + +private: + PtrOwnedList gai_results; + + addrinfo* query(const std::string& host, const std::string& port, int socktype, int flags) { + addrinfo hints{}; + hints.ai_flags = flags; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = socktype; + + addrinfo* head; + int ret = ::getaddrinfo(host.empty() ? NULL : host.c_str(), port.c_str(), &hints, &head); + if (ret == 0) { + return head; +#ifdef _WIN32 + } else { + throw std::system_error{THRIFT_GET_SOCKET_ERROR, std::system_category()}; +#else + } else if (ret == EAI_SYSTEM) { + throw std::system_error{THRIFT_GET_SOCKET_ERROR, std::system_category()}; + } else { + throw std::system_error{ret, gai_error()}; +#endif + } + } + +public: + /** + * Constructor. May block. Throws errors. + * + * @param port Port number, or service name, as a string. + * @param socktype Socket type, SOCK_STREAM or SOCK_DGRAM. + * @param flags Standard getaddrinfo() flags. + */ + AddressResolutionHelper(const std::string& host, + const std::string& port, // pass "25" or "smtp" for port 25 + int socktype = SOCK_STREAM, + int flags = AI_V4MAPPED | AI_ADDRCONFIG) + : gai_results(query(host, port, socktype, flags)) {} + + AddressResolutionHelper() = default; + + /** + * Manual query. May block. Throws errors. + * + * @param port Port number, or service name, as a string. + * @param socktype Socket type, SOCK_STREAM or SOCK_DGRAM. + * @param flags Standard getaddrinfo() flags. + */ + AddressResolutionHelper& resolve(const std::string& host, + const std::string& port, // pass "25" or "smtp" for port 25 + int socktype = SOCK_STREAM, + int flags = AI_V4MAPPED | AI_ADDRCONFIG) { + gai_results.reset(query(host, port, socktype, flags)); + return *this; + } + + /** + * Return ForwardIterator to struct addrinfo* results. + */ + Iter iterate() const { return Iter{gai_results.get()}; } +}; + +} // namespace thrift +} // namespace apache + +#endif |