diff options
Diffstat (limited to 'cpp/src/qpid/sys/windows')
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 71 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/AsynchIoResult.h | 6 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IocpPoller.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/Shlib.cpp | 3 | ||||
-rw-r--r--[-rwxr-xr-x] | cpp/src/qpid/sys/windows/Socket.cpp | 188 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SocketAddress.cpp | 120 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SslAsynchIO.h | 3 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/StrError.cpp | 7 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/Thread.cpp | 285 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/Time.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/mingw32_compat.h | 39 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/uuid.cpp | 6 |
12 files changed, 553 insertions, 217 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 38d8842521..30378d4c5f 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -30,6 +30,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/mingw32_compat.h" #include <boost/thread/once.hpp> @@ -46,16 +47,13 @@ namespace { /* * The function pointers for AcceptEx and ConnectEx need to be looked up - * at run time. Make sure this is done only once. + * at run time. */ -boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT; -LPFN_ACCEPTEX fnAcceptEx = 0; -typedef void (*lookUpFunc)(const qpid::sys::Socket &); - -void lookUpAcceptEx() { - SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { + SOCKET h = toSocketHandle(s); GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; + LPFN_ACCEPTEX fnAcceptEx; WSAIoctl(h, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidAcceptEx, @@ -65,9 +63,9 @@ void lookUpAcceptEx() { &dwBytes, NULL, NULL); - closesocket(h); if (fnAcceptEx == 0) throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); + return fnAcceptEx; } } @@ -94,18 +92,15 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; + const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - socket(s) { + socket(s), + fnAcceptEx(lookUpAcceptEx(s)) { s.setNonblocking(); -#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */ - boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx); -#else - boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce); -#endif } AsynchAcceptor::~AsynchAcceptor() @@ -114,7 +109,8 @@ AsynchAcceptor::~AsynchAcceptor() } void AsynchAcceptor::start(Poller::shared_ptr poller) { - poller->monitorHandle(PollerHandle(socket), Poller::INPUT); + PollerHandle ph = PollerHandle(socket); + poller->monitorHandle(ph, Poller::INPUT); restart (); } @@ -122,25 +118,26 @@ void AsynchAcceptor::restart(void) { DWORD bytesReceived = 0; // Not used, needed for AcceptEx API AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, this, - toSocketHandle(socket)); + socket); BOOL status; - status = ::fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), - result->addressBuffer, - 0, - AsynchAcceptResult::SOCKADDRMAXLEN, - AsynchAcceptResult::SOCKADDRMAXLEN, - &bytesReceived, - result->overlapped()); + status = fnAcceptEx(toSocketHandle(socket), + toSocketHandle(*result->newSocket), + result->addressBuffer, + 0, + AsynchAcceptResult::SOCKADDRMAXLEN, + AsynchAcceptResult::SOCKADDRMAXLEN, + &bytesReceived, + result->overlapped()); QPID_WINDOWS_CHECK_ASYNC_START(status); } AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - SOCKET listener) - : callback(cb), acceptor(acceptor), listener(listener) { - newSocket.reset (new Socket()); + const Socket& listener) + : callback(cb), acceptor(acceptor), + listener(toSocketHandle(listener)), + newSocket(listener.createSameTypeSocket()) { } void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { @@ -154,7 +151,7 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { delete this; } -void AsynchAcceptResult::failure(int status) { +void AsynchAcceptResult::failure(int /*status*/) { //if (status != WSA_OPERATION_ABORTED) // Can there be anything else? ; delete this; @@ -173,20 +170,20 @@ private: FailedCallback failCallback; const Socket& socket; const std::string hostname; - const uint16_t port; + const std::string port; public: AsynchConnector(const Socket& socket, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb = 0); void start(Poller::shared_ptr poller); }; AsynchConnector::AsynchConnector(const Socket& sock, - std::string hname, - uint16_t p, + const std::string& hname, + const std::string& p, ConnectedCallback connCb, FailedCallback failCb) : connCallback(connCb), failCallback(failCb), socket(sock), @@ -216,8 +213,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s, } AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, - std::string hostname, - uint16_t port, + const std::string& hostname, + const std::string& port, ConnectedCallback connCb, FailedCallback failCb) { @@ -410,8 +407,9 @@ void AsynchIO::queueForDeletion() { } void AsynchIO::start(Poller::shared_ptr poller0) { + PollerHandle ph = PollerHandle(socket); poller = poller0; - poller->monitorHandle(PollerHandle(socket), Poller::INPUT); + poller->monitorHandle(ph, Poller::INPUT); if (writeQueue.size() > 0) // Already have data queued for write notifyPendingWrite(); startReading(); @@ -584,7 +582,6 @@ void AsynchIO::notifyIdle(void) { void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { writeInProgress = true; InterlockedIncrement(&opsInProgress); - int writeCount = buff->byteCount-buff->dataCount; AsynchWriteResult *result = new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1), buff, diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h index 66c89efc11..27e4c22138 100755 --- a/cpp/src/qpid/sys/windows/AsynchIoResult.h +++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h @@ -83,22 +83,22 @@ class AsynchAcceptResult : public AsynchResult { public: AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - SOCKET listener); + const qpid::sys::Socket& listener); virtual void success (size_t bytesTransferred); virtual void failure (int error); private: virtual void complete(void) {} // No-op for this class. - std::auto_ptr<qpid::sys::Socket> newSocket; qpid::sys::AsynchAcceptor::Callback callback; AsynchAcceptor *acceptor; SOCKET listener; + std::auto_ptr<qpid::sys::Socket> newSocket; // AcceptEx needs a place to write the local and remote addresses // when accepting the connection. Place those here; get enough for // IPv6 addresses, even if the socket is IPv4. - enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16, + enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16, SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN }; char addressBuffer[SOCKADDRBUFLEN]; }; diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp index d326ab02ac..1805dd2cd8 100755 --- a/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -152,9 +152,9 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) { } // All no-ops... -void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {} -void Poller::registerHandle(PollerHandle& handle) {} -void Poller::unregisterHandle(PollerHandle& handle) {} +void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {} +void Poller::registerHandle(PollerHandle& /*handle*/) {} +void Poller::unregisterHandle(PollerHandle& /*handle*/) {} Poller::Event Poller::wait(Duration timeout) { DWORD timeoutMs = 0; diff --git a/cpp/src/qpid/sys/windows/Shlib.cpp b/cpp/src/qpid/sys/windows/Shlib.cpp index 38027de93f..ba18747eb4 100644 --- a/cpp/src/qpid/sys/windows/Shlib.cpp +++ b/cpp/src/qpid/sys/windows/Shlib.cpp @@ -44,7 +44,8 @@ void Shlib::unload() { } void* Shlib::getSymbol(const char* name) { - void* sym = GetProcAddress(static_cast<HMODULE>(handle), name); + // Double cast avoids warning about casting function pointer to object + void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name))); if (sym == NULL) throw QPID_WINDOWS_ERROR(GetLastError()); return sym; diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp index 11fb8b4133..1fa4768329 100755..100644 --- a/cpp/src/qpid/sys/windows/Socket.cpp +++ b/cpp/src/qpid/sys/windows/Socket.cpp @@ -20,19 +20,18 @@ */ #include "qpid/sys/Socket.h" + #include "qpid/sys/SocketAddress.h" -#include "qpid/sys/windows/IoHandlePrivate.h" #include "qpid/sys/windows/check.h" -#include "qpid/sys/Time.h" +#include "qpid/sys/windows/IoHandlePrivate.h" -#include <cstdlib> -#include <string.h> +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif #include <winsock2.h> -#include <boost/format.hpp> -#include <boost/lexical_cast.hpp> - // Need to initialize WinSock. Ideally, this would be a singleton or embedded // in some one-time initialization function. I tried boost singleton and could // not get it to compile (and others located in google had the same problem). @@ -84,53 +83,30 @@ namespace sys { namespace { -std::string getName(SOCKET fd, bool local, bool includeService = false) +std::string getName(SOCKET fd, bool local) { - sockaddr_in name; // big enough for any socket address - socklen_t namelen = sizeof(name); + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + if (local) { - QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); + QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); } else { - QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen)); + QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen)); } - char servName[NI_MAXSERV]; - char dispName[NI_MAXHOST]; - if (includeService) { - if (int rc = ::getnameinfo((sockaddr*)&name, namelen, - dispName, sizeof(dispName), - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw qpid::Exception(QPID_MSG(gai_strerror(rc))); - return std::string(dispName) + ":" + std::string(servName); - } else { - if (int rc = ::getnameinfo((sockaddr*)&name, namelen, - dispName, sizeof(dispName), - 0, 0, - NI_NUMERICHOST) != 0) - throw qpid::Exception(QPID_MSG(gai_strerror(rc))); - return dispName; - } + return SocketAddress::asString(name, namelen); } -std::string getService(SOCKET fd, bool local) +uint16_t getLocalPort(int fd) { - sockaddr_in name; // big enough for any socket address - socklen_t namelen = sizeof(name); - - if (local) { - QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); - } else { - QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen)); - } + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); - char servName[NI_MAXSERV]; - if (int rc = ::getnameinfo((sockaddr*)&name, namelen, - 0, 0, - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw qpid::Exception(QPID_MSG(gai_strerror(rc))); - return servName; + return SocketAddress::getPort(name); } } // namespace @@ -138,13 +114,7 @@ Socket::Socket() : IOHandle(new IOHandlePrivate), nonblocking(false), nodelay(false) -{ - SOCKET& socket = impl->fd; - if (socket != INVALID_SOCKET) Socket::close(); - SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); - if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); - socket = s; -} +{} Socket::Socket(IOHandlePrivate* h) : IOHandle(h), @@ -152,8 +122,7 @@ Socket::Socket(IOHandlePrivate* h) : nodelay(false) {} -void -Socket::createSocket(const SocketAddress& sa) const +void Socket::createSocket(const SocketAddress& sa) const { SOCKET& socket = impl->fd; if (socket != INVALID_SOCKET) Socket::close(); @@ -168,24 +137,24 @@ Socket::createSocket(const SocketAddress& sa) const if (nonblocking) setNonblocking(); if (nodelay) setTcpNoDelay(); } catch (std::exception&) { - closesocket(s); + ::closesocket(s); socket = INVALID_SOCKET; throw; } } -void Socket::setTimeout(const Duration& interval) const -{ - const SOCKET& socket = impl->fd; - int64_t nanosecs = interval; - nanosecs /= (1000 * 1000); // nsecs -> usec -> msec - int msec = 0; - if (nanosecs > std::numeric_limits<int>::max()) - msec = std::numeric_limits<int>::max(); - else - msec = static_cast<int>(nanosecs); - setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec)); - setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec)); +Socket* Socket::createSameTypeSocket() const { + SOCKET& socket = impl->fd; + // Socket currently has no actual socket attached + if (socket == INVALID_SOCKET) + return new Socket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + return new Socket(new IOHandlePrivate(s)); } void Socket::setNonblocking() const { @@ -193,30 +162,25 @@ void Socket::setNonblocking() const { QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); } -void Socket::connect(const std::string& host, uint16_t port) const +void Socket::connect(const std::string& host, const std::string& port) const { - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + SocketAddress sa(host, port); connect(sa); } void Socket::connect(const SocketAddress& addr) const { + peername = addr.asString(false); + + createSocket(addr); + const SOCKET& socket = impl->fd; - const addrinfo *addrs = &(getAddrInfo(addr)); - int error = 0; + int err; WSASetLastError(0); - while (addrs != 0) { - if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) || - (WSAGetLastError() == WSAEWOULDBLOCK)) - break; - // Error... save this error code and see if there are other address - // to try before throwing the exception. - error = WSAGetLastError(); - addrs = addrs->ai_next; - } - if (error) - throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname)); + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) && + ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK)) + throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername)); } void @@ -247,24 +211,26 @@ int Socket::read(void *buf, size_t count) const return received; } -int Socket::listen(uint16_t port, int backlog) const +int Socket::listen(const std::string& host, const std::string& port, int backlog) const +{ + SocketAddress sa(host, port); + return listen(sa, backlog); +} + +int Socket::listen(const SocketAddress& addr, int backlog) const { + createSocket(addr); + const SOCKET& socket = impl->fd; BOOL yes=1; QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); - struct sockaddr_in name; - memset(&name, 0, sizeof(name)); - name.sin_family = AF_INET; - name.sin_port = htons(port); - name.sin_addr.s_addr = 0; - if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError()))); + + if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError()))); if (::listen(socket, backlog) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError()))); - - socklen_t namelen = sizeof(name); - QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen)); - return ntohs(name.sin_port); + throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError()))); + + return getLocalPort(socket); } Socket* Socket::accept() const @@ -277,36 +243,20 @@ Socket* Socket::accept() const else throw QPID_WINDOWS_ERROR(WSAGetLastError()); } -std::string Socket::getSockname() const -{ - return getName(impl->fd, true); -} - -std::string Socket::getPeername() const -{ - return getName(impl->fd, false); -} - std::string Socket::getPeerAddress() const { - if (!connectname.empty()) - return std::string (connectname); - return getName(impl->fd, false, true); + if (peername.empty()) { + peername = getName(impl->fd, false); + } + return peername; } std::string Socket::getLocalAddress() const { - return getName(impl->fd, true, true); -} - -uint16_t Socket::getLocalPort() const -{ - return atoi(getService(impl->fd, true).c_str()); -} - -uint16_t Socket::getRemotePort() const -{ - return atoi(getService(impl->fd, true).c_str()); + if (localname.empty()) { + localname = getName(impl->fd, true); + } + return localname; } int Socket::getError() const diff --git a/cpp/src/qpid/sys/windows/SocketAddress.cpp b/cpp/src/qpid/sys/windows/SocketAddress.cpp index 501cff1297..77bbf85810 100644 --- a/cpp/src/qpid/sys/windows/SocketAddress.cpp +++ b/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -21,7 +21,13 @@ #include "qpid/sys/SocketAddress.h" -#include "qpid/sys/windows/check.h" +#include "qpid/Exception.h" +#include "qpid/Msg.h" + +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif #include <winsock2.h> #include <ws2tcpip.h> @@ -35,37 +41,111 @@ SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) port(port0), addrInfo(0) { - ::addrinfo hints; - ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well - hints.ai_socktype = SOCK_STREAM; - - const char* node = 0; - if (host.empty()) { - hints.ai_flags |= AI_PASSIVE; - } else { - node = host.c_str(); - } - const char* service = port.empty() ? "0" : port.c_str(); +} - int n = ::getaddrinfo(node, service, &hints, &addrInfo); - if (n != 0) - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); +SocketAddress::SocketAddress(const SocketAddress& sa) : + host(sa.host), + port(sa.port), + addrInfo(0) +{ +} + +SocketAddress& SocketAddress::operator=(const SocketAddress& sa) +{ + SocketAddress temp(sa); + + std::swap(temp, *this); + return *this; } SocketAddress::~SocketAddress() { - ::freeaddrinfo(addrInfo); + if (addrInfo) { + ::freeaddrinfo(addrInfo); + } } -std::string SocketAddress::asString() const +std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen) { - return host + ":" + port; + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (int rc=::getnameinfo(addr, addrlen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + std::string s; + switch (addr->sa_family) { + case AF_INET: s += dispName; break; + case AF_INET6: s += "["; s += dispName; s+= "]"; break; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } + s += ":"; + s += servName; + return s; +} + +uint16_t SocketAddress::getPort(::sockaddr const * const addr) +{ + switch (addr->sa_family) { + case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port); + case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port); + default:throw Exception(QPID_MSG("Unexpected socket type")); + } +} + +std::string SocketAddress::asString(bool numeric) const +{ + if (!numeric) + return host + ":" + port; + // Canonicalise into numeric id + const ::addrinfo& ai = getAddrInfo(*this); + + return asString(ai.ai_addr, ai.ai_addrlen); +} + +bool SocketAddress::nextAddress() { + bool r = currentAddrInfo->ai_next != 0; + if (r) + currentAddrInfo = currentAddrInfo->ai_next; + return r; +} + +void SocketAddress::setAddrInfoPort(uint16_t port) { + if (!currentAddrInfo) return; + + ::addrinfo& ai = *currentAddrInfo; + switch (ai.ai_family) { + case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return; + case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } } const ::addrinfo& getAddrInfo(const SocketAddress& sa) { - return *sa.addrInfo; + if (!sa.addrInfo) { + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for + hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (sa.host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = sa.host.c_str(); + } + const char* service = sa.port.empty() ? "0" : sa.port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); + sa.currentAddrInfo = sa.addrInfo; + } + + return *sa.currentAddrInfo; } }} diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h index 3cdf2c8f08..edec081ced 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.h +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -39,9 +39,6 @@ namespace qpid { namespace sys { namespace windows { -class Socket; -class Poller; - /* * SSL/Schannel shim between the frame-handling and AsynchIO layers. * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class diff --git a/cpp/src/qpid/sys/windows/StrError.cpp b/cpp/src/qpid/sys/windows/StrError.cpp index 9c1bfcd79c..546d399d16 100755 --- a/cpp/src/qpid/sys/windows/StrError.cpp +++ b/cpp/src/qpid/sys/windows/StrError.cpp @@ -30,6 +30,7 @@ namespace sys { std::string strError(int err) { const size_t bufsize = 512; char buf[bufsize]; + buf[0] = 0; if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM, 0, @@ -39,7 +40,11 @@ std::string strError(int err) { bufsize, 0)) { - strerror_s (buf, bufsize, err); +#ifdef _MSC_VER + strerror_s(buf, bufsize, err); +#else + return std::string(strerror(err)); +#endif } return std::string(buf); } diff --git a/cpp/src/qpid/sys/windows/Thread.cpp b/cpp/src/qpid/sys/windows/Thread.cpp index 583a9613a3..23b0033be4 100755 --- a/cpp/src/qpid/sys/windows/Thread.cpp +++ b/cpp/src/qpid/sys/windows/Thread.cpp @@ -19,6 +19,11 @@ * */ +// Ensure definition of OpenThread in mingw +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/windows/check.h" @@ -26,50 +31,204 @@ #include <process.h> #include <windows.h> -namespace { -unsigned __stdcall runRunnable(void* p) -{ - static_cast<qpid::sys::Runnable*>(p)->run(); - _endthreadex(0); - return 0; -} -} +/* + * This implementation distinguishes between two types of thread: Qpid + * threads (based on qpid::sys::Runnable) and the rest. It provides a + * join() that will not deadlock against the Windows loader lock for + * Qpid threads. + * + * System thread identifiers are unique per Windows thread; thread + * handles are not. Thread identifiers can be recycled, but keeping a + * handle open against the thread prevents recycling as long as + * shared_ptr references to a ThreadPrivate structure remain. + * + * There is a 1-1 relationship between Qpid threads and their + * ThreadPrivate structure. Non-Qpid threads do not need to find the + * qpidThreadDone handle, so there may be a 1-many relationship for + * them. + * + * TLS storage is used for a lockless solution for static library + * builds. The special case of LoadLibrary/FreeLibrary requires + * additional synchronization variables and resource cleanup in + * DllMain. _DLL marks the dynamic case. + */ namespace qpid { namespace sys { class ThreadPrivate { +public: friend class Thread; + friend unsigned __stdcall runThreadPrivate(void*); + typedef boost::shared_ptr<ThreadPrivate> shared_ptr; + ~ThreadPrivate(); - HANDLE threadHandle; +private: unsigned threadId; - - ThreadPrivate(Runnable* runnable) { - uintptr_t h = _beginthreadex(0, - 0, - runRunnable, - runnable, - 0, - &threadId); - QPID_WINDOWS_CHECK_CRT_NZ(h); - threadHandle = reinterpret_cast<HANDLE>(h); + HANDLE threadHandle; + HANDLE initCompleted; + HANDLE qpidThreadDone; + Runnable* runnable; + shared_ptr keepAlive; + + ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL), + qpidThreadDone(NULL), runnable(NULL) { + threadHandle = OpenThread (SYNCHRONIZE, FALSE, threadId); + QPID_WINDOWS_CHECK_CRT_NZ(threadHandle); } - - ThreadPrivate() - : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {} + + ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL), + qpidThreadDone(NULL), runnable(r) {} + + void start(shared_ptr& p); + static shared_ptr createThread(Runnable* r); }; +}} // namespace qpid::sys + + +namespace { +using namespace qpid::sys; + +#ifdef _DLL +class ScopedCriticalSection +{ + public: + ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); } + ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); } + private: + CRITICAL_SECTION& criticalSection; +}; + +CRITICAL_SECTION threadLock; +long runningThreads = 0; +HANDLE threadsDone; +bool terminating = false; +#endif + + +DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES; + +DWORD getTlsIndex() { + if (tlsIndex != TLS_OUT_OF_INDEXES) + return tlsIndex; // already set + + DWORD trialIndex = TlsAlloc(); + QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource + + // only one thread gets to set the value + DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES); + if (actualIndex == TLS_OUT_OF_INDEXES) + return trialIndex; // we won the race + else { + TlsFree(trialIndex); + return actualIndex; + } +} + +} // namespace + +namespace qpid { +namespace sys { + +unsigned __stdcall runThreadPrivate(void* p) +{ + ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p); + TlsSetValue(getTlsIndex(), threadPrivate); + + WaitForSingleObject (threadPrivate->initCompleted, INFINITE); + CloseHandle (threadPrivate->initCompleted); + threadPrivate->initCompleted = NULL; + + try { + threadPrivate->runnable->run(); + } catch (...) { + // not our concern + } + + SetEvent (threadPrivate->qpidThreadDone); // allow join() + threadPrivate->keepAlive.reset(); // may run ThreadPrivate destructor + +#ifdef _DLL + { + ScopedCriticalSection l(threadLock); + if (--runningThreads == 0) + SetEvent(threadsDone); + } +#endif + return 0; +} + + +ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) { + ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable)); + tp->start(tp); + return tp; +} + +void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) { + getTlsIndex(); // fail here if OS problem, not in new thread + + initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL); + QPID_WINDOWS_CHECK_CRT_NZ(initCompleted); + qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL); + QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone); + +#ifdef _DLL + { + ScopedCriticalSection l(threadLock); + if (terminating) + throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary")); + runningThreads++; + } +#endif + + uintptr_t h = _beginthreadex(0, + 0, + runThreadPrivate, + (void *)this, + 0, + &threadId); + +#ifdef _DLL + if (h == NULL) { + ScopedCriticalSection l(threadLock); + if (--runningThreads == 0) + SetEvent(threadsDone); + } +#endif + + QPID_WINDOWS_CHECK_CRT_NZ(h); + + // Success + keepAlive = tp; + threadHandle = reinterpret_cast<HANDLE>(h); + SetEvent (initCompleted); +} + +ThreadPrivate::~ThreadPrivate() { + if (threadHandle) + CloseHandle (threadHandle); + if (initCompleted) + CloseHandle (initCompleted); + if (qpidThreadDone) + CloseHandle (qpidThreadDone); +} + + Thread::Thread() {} -Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {} +Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {} -Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {} +Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {} Thread::operator bool() { return impl; } bool Thread::operator==(const Thread& t) const { + if (!impl || !t.impl) + return false; return impl->threadId == t.impl->threadId; } @@ -79,10 +238,17 @@ bool Thread::operator!=(const Thread& t) const { void Thread::join() { if (impl) { - DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE); + DWORD status; + if (impl->runnable) { + HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle}; + // wait for either. threadHandle not signalled if loader + // lock held (FreeLibrary). qpidThreadDone not signalled + // if thread terminated by exit(). + status = WaitForMultipleObjects (2, handles, false, INFINITE); + } + else + status = WaitForSingleObject (impl->threadHandle, INFINITE); QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED); - CloseHandle (impl->threadHandle); - impl->threadHandle = 0; } } @@ -92,9 +258,70 @@ unsigned long Thread::logId() { /* static */ Thread Thread::current() { + ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex()); Thread t; - t.impl.reset(new ThreadPrivate()); + if (tlsValue != NULL) { + // called from within Runnable->run(), so keepAlive has positive use count + t.impl = tlsValue->keepAlive; + } + else + t.impl.reset(new ThreadPrivate()); return t; } -}} /* qpid::sys */ +}} // namespace qpid::sys + + +#ifdef _DLL + +// DllMain: called possibly many times in a process lifetime if dll +// loaded and freed repeatedly . Be mindful of Windows loader lock +// and other DllMain restrictions. + +BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) { + switch (reason) { + case DLL_PROCESS_ATTACH: + InitializeCriticalSection(&threadLock); + threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL); + break; + + case DLL_PROCESS_DETACH: + terminating = true; + if (reserved != NULL) { + // process exit(): threads are stopped arbitrarily and + // possibly in an inconsistent state. Not even threadLock + // can be trusted. All static destructors have been + // called at this point and any resources this unit knows + // about will be released as part of process tear down by + // the OS. Accordingly, do nothing. + return TRUE; + } + else { + // FreeLibrary(): threads are still running and we are + // encouraged to clean up to avoid leaks. Mostly we just + // want any straggler threads to finish and notify + // threadsDone as the last thing they do. + while (1) { + { + ScopedCriticalSection l(threadLock); + if (runningThreads == 0) + break; + ResetEvent(threadsDone); + } + WaitForSingleObject(threadsDone, INFINITE); + } + if (tlsIndex != TLS_OUT_OF_INDEXES) + TlsFree(getTlsIndex()); + CloseHandle(threadsDone); + DeleteCriticalSection(&threadLock); + } + break; + + case DLL_THREAD_ATTACH: + case DLL_THREAD_DETACH: + break; + } + return TRUE; +} + +#endif diff --git a/cpp/src/qpid/sys/windows/Time.cpp b/cpp/src/qpid/sys/windows/Time.cpp index 16d09fcdc0..25c50819cd 100644 --- a/cpp/src/qpid/sys/windows/Time.cpp +++ b/cpp/src/qpid/sys/windows/Time.cpp @@ -27,6 +27,17 @@ using namespace boost::posix_time; +namespace { + +// High-res timing support. This will display times since program start, +// more or less. Keep track of the start value and the conversion factor to +// seconds. +bool timeInitialized = false; +LARGE_INTEGER start; +double freq = 1.0; + +} + namespace qpid { namespace sys { @@ -91,10 +102,35 @@ void outputFormattedNow(std::ostream& o) { char time_string[100]; ::time( &rawtime ); +#ifdef _MSC_VER ::localtime_s(&timeinfo, &rawtime); +#else + timeinfo = *(::localtime(&rawtime)); +#endif ::strftime(time_string, 100, "%Y-%m-%d %H:%M:%S", &timeinfo); o << time_string << " "; } + +void outputHiresNow(std::ostream& o) { + if (!timeInitialized) { + start.QuadPart = 0; + LARGE_INTEGER iFreq; + iFreq.QuadPart = 1; + QueryPerformanceCounter(&start); + QueryPerformanceFrequency(&iFreq); + freq = static_cast<double>(iFreq.QuadPart); + timeInitialized = true; + } + LARGE_INTEGER iNow; + iNow.QuadPart = 0; + QueryPerformanceCounter(&iNow); + iNow.QuadPart -= start.QuadPart; + if (iNow.QuadPart < 0) + iNow.QuadPart = 0; + double now = static_cast<double>(iNow.QuadPart); + now /= freq; // now is seconds after this + o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s "; +} }} diff --git a/cpp/src/qpid/sys/windows/mingw32_compat.h b/cpp/src/qpid/sys/windows/mingw32_compat.h new file mode 100644 index 0000000000..51f613cc25 --- /dev/null +++ b/cpp/src/qpid/sys/windows/mingw32_compat.h @@ -0,0 +1,39 @@ +#ifndef _sys_windows_mingw32_compat +#define _sys_windows_mingw32_compat +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifdef WIN32 +#ifndef _MSC_VER + +// +// The following definitions for extension function GUIDs and signatures are taken from +// MswSock.h in the Windows32 SDK. These rightfully belong in the mingw32 version of +// mswsock.h, but are not included presently. +// + +#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} +typedef BOOL (PASCAL *LPFN_ACCEPTEX)(SOCKET,SOCKET,PVOID,DWORD,DWORD,DWORD,LPDWORD,LPOVERLAPPED); + +#endif +#endif + +#endif diff --git a/cpp/src/qpid/sys/windows/uuid.cpp b/cpp/src/qpid/sys/windows/uuid.cpp index b5360622dc..3316ecbc00 100644 --- a/cpp/src/qpid/sys/windows/uuid.cpp +++ b/cpp/src/qpid/sys/windows/uuid.cpp @@ -19,7 +19,7 @@ * */ -#include <Rpc.h> +#include <rpc.h> #ifdef uuid_t /* Done in rpcdce.h */ # undef uuid_t #endif @@ -52,7 +52,11 @@ int uuid_parse (const char *in, uuid_t uu) { void uuid_unparse (const uuid_t uu, char *out) { unsigned char *formatted; if (UuidToString((UUID*)uu, &formatted) == RPC_S_OK) { +#ifdef _MSC_VER strncpy_s (out, 36+1, (char*)formatted, _TRUNCATE); +#else + strncpy (out, (char*)formatted, 36+1); +#endif RpcStringFree(&formatted); } } |