summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/windows
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/windows')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp71
-rwxr-xr-xcpp/src/qpid/sys/windows/AsynchIoResult.h6
-rwxr-xr-xcpp/src/qpid/sys/windows/IocpPoller.cpp6
-rw-r--r--cpp/src/qpid/sys/windows/Shlib.cpp3
-rw-r--r--[-rwxr-xr-x]cpp/src/qpid/sys/windows/Socket.cpp188
-rw-r--r--cpp/src/qpid/sys/windows/SocketAddress.cpp120
-rw-r--r--cpp/src/qpid/sys/windows/SslAsynchIO.h3
-rwxr-xr-xcpp/src/qpid/sys/windows/StrError.cpp7
-rwxr-xr-xcpp/src/qpid/sys/windows/Thread.cpp285
-rw-r--r--cpp/src/qpid/sys/windows/Time.cpp36
-rw-r--r--cpp/src/qpid/sys/windows/mingw32_compat.h39
-rw-r--r--cpp/src/qpid/sys/windows/uuid.cpp6
12 files changed, 553 insertions, 217 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 38d8842521..30378d4c5f 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/mingw32_compat.h"
#include <boost/thread/once.hpp>
@@ -46,16 +47,13 @@ namespace {
/*
* The function pointers for AcceptEx and ConnectEx need to be looked up
- * at run time. Make sure this is done only once.
+ * at run time.
*/
-boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
-LPFN_ACCEPTEX fnAcceptEx = 0;
-typedef void (*lookUpFunc)(const qpid::sys::Socket &);
-
-void lookUpAcceptEx() {
- SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
+ SOCKET h = toSocketHandle(s);
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
+ LPFN_ACCEPTEX fnAcceptEx;
WSAIoctl(h,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
@@ -65,9 +63,9 @@ void lookUpAcceptEx() {
&dwBytes,
NULL,
NULL);
- closesocket(h);
if (fnAcceptEx == 0)
throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+ return fnAcceptEx;
}
}
@@ -94,18 +92,15 @@ private:
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
+ const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
- socket(s) {
+ socket(s),
+ fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
-#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */
- boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
-#else
- boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
-#endif
}
AsynchAcceptor::~AsynchAcceptor()
@@ -114,7 +109,8 @@ AsynchAcceptor::~AsynchAcceptor()
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ PollerHandle ph = PollerHandle(socket);
+ poller->monitorHandle(ph, Poller::INPUT);
restart ();
}
@@ -122,25 +118,26 @@ void AsynchAcceptor::restart(void) {
DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
this,
- toSocketHandle(socket));
+ socket);
BOOL status;
- status = ::fnAcceptEx(toSocketHandle(socket),
- toSocketHandle(*result->newSocket),
- result->addressBuffer,
- 0,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- &bytesReceived,
- result->overlapped());
+ status = fnAcceptEx(toSocketHandle(socket),
+ toSocketHandle(*result->newSocket),
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
QPID_WINDOWS_CHECK_ASYNC_START(status);
}
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- SOCKET listener)
- : callback(cb), acceptor(acceptor), listener(listener) {
- newSocket.reset (new Socket());
+ const Socket& listener)
+ : callback(cb), acceptor(acceptor),
+ listener(toSocketHandle(listener)),
+ newSocket(listener.createSameTypeSocket()) {
}
void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
@@ -154,7 +151,7 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
delete this;
}
-void AsynchAcceptResult::failure(int status) {
+void AsynchAcceptResult::failure(int /*status*/) {
//if (status != WSA_OPERATION_ABORTED)
// Can there be anything else? ;
delete this;
@@ -173,20 +170,20 @@ private:
FailedCallback failCallback;
const Socket& socket;
const std::string hostname;
- const uint16_t port;
+ const std::string port;
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
void start(Poller::shared_ptr poller);
};
AsynchConnector::AsynchConnector(const Socket& sock,
- std::string hname,
- uint16_t p,
+ const std::string& hname,
+ const std::string& p,
ConnectedCallback connCb,
FailedCallback failCb) :
connCallback(connCb), failCallback(failCb), socket(sock),
@@ -216,8 +213,8 @@ AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
}
AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
@@ -410,8 +407,9 @@ void AsynchIO::queueForDeletion() {
}
void AsynchIO::start(Poller::shared_ptr poller0) {
+ PollerHandle ph = PollerHandle(socket);
poller = poller0;
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ poller->monitorHandle(ph, Poller::INPUT);
if (writeQueue.size() > 0) // Already have data queued for write
notifyPendingWrite();
startReading();
@@ -584,7 +582,6 @@ void AsynchIO::notifyIdle(void) {
void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
writeInProgress = true;
InterlockedIncrement(&opsInProgress);
- int writeCount = buff->byteCount-buff->dataCount;
AsynchWriteResult *result =
new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
buff,
diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h
index 66c89efc11..27e4c22138 100755
--- a/cpp/src/qpid/sys/windows/AsynchIoResult.h
+++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h
@@ -83,22 +83,22 @@ class AsynchAcceptResult : public AsynchResult {
public:
AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- SOCKET listener);
+ const qpid::sys::Socket& listener);
virtual void success (size_t bytesTransferred);
virtual void failure (int error);
private:
virtual void complete(void) {} // No-op for this class.
- std::auto_ptr<qpid::sys::Socket> newSocket;
qpid::sys::AsynchAcceptor::Callback callback;
AsynchAcceptor *acceptor;
SOCKET listener;
+ std::auto_ptr<qpid::sys::Socket> newSocket;
// AcceptEx needs a place to write the local and remote addresses
// when accepting the connection. Place those here; get enough for
// IPv6 addresses, even if the socket is IPv4.
- enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16,
+ enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16,
SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
char addressBuffer[SOCKADDRBUFLEN];
};
diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp
index d326ab02ac..1805dd2cd8 100755
--- a/cpp/src/qpid/sys/windows/IocpPoller.cpp
+++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -152,9 +152,9 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
}
// All no-ops...
-void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {}
-void Poller::registerHandle(PollerHandle& handle) {}
-void Poller::unregisterHandle(PollerHandle& handle) {}
+void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {}
+void Poller::registerHandle(PollerHandle& /*handle*/) {}
+void Poller::unregisterHandle(PollerHandle& /*handle*/) {}
Poller::Event Poller::wait(Duration timeout) {
DWORD timeoutMs = 0;
diff --git a/cpp/src/qpid/sys/windows/Shlib.cpp b/cpp/src/qpid/sys/windows/Shlib.cpp
index 38027de93f..ba18747eb4 100644
--- a/cpp/src/qpid/sys/windows/Shlib.cpp
+++ b/cpp/src/qpid/sys/windows/Shlib.cpp
@@ -44,7 +44,8 @@ void Shlib::unload() {
}
void* Shlib::getSymbol(const char* name) {
- void* sym = GetProcAddress(static_cast<HMODULE>(handle), name);
+ // Double cast avoids warning about casting function pointer to object
+ void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name)));
if (sym == NULL)
throw QPID_WINDOWS_ERROR(GetLastError());
return sym;
diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp
index 11fb8b4133..1fa4768329 100755..100644
--- a/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/cpp/src/qpid/sys/windows/Socket.cpp
@@ -20,19 +20,18 @@
*/
#include "qpid/sys/Socket.h"
+
#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/windows/check.h"
-#include "qpid/sys/Time.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
-#include <cstdlib>
-#include <string.h>
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
#include <winsock2.h>
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
-
// Need to initialize WinSock. Ideally, this would be a singleton or embedded
// in some one-time initialization function. I tried boost singleton and could
// not get it to compile (and others located in google had the same problem).
@@ -84,53 +83,30 @@ namespace sys {
namespace {
-std::string getName(SOCKET fd, bool local, bool includeService = false)
+std::string getName(SOCKET fd, bool local)
{
- sockaddr_in name; // big enough for any socket address
- socklen_t namelen = sizeof(name);
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
if (local) {
- QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
+ QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
} else {
- QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
+ QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen));
}
- char servName[NI_MAXSERV];
- char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return std::string(dispName) + ":" + std::string(servName);
- } else {
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- dispName, sizeof(dispName),
- 0, 0,
- NI_NUMERICHOST) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return dispName;
- }
+ return SocketAddress::asString(name, namelen);
}
-std::string getService(SOCKET fd, bool local)
+uint16_t getLocalPort(int fd)
{
- sockaddr_in name; // big enough for any socket address
- socklen_t namelen = sizeof(name);
-
- if (local) {
- QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
- } else {
- QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
- }
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
+ QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
- char servName[NI_MAXSERV];
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- 0, 0,
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return servName;
+ return SocketAddress::getPort(name);
}
} // namespace
@@ -138,13 +114,7 @@ Socket::Socket() :
IOHandle(new IOHandlePrivate),
nonblocking(false),
nodelay(false)
-{
- SOCKET& socket = impl->fd;
- if (socket != INVALID_SOCKET) Socket::close();
- SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
- if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
- socket = s;
-}
+{}
Socket::Socket(IOHandlePrivate* h) :
IOHandle(h),
@@ -152,8 +122,7 @@ Socket::Socket(IOHandlePrivate* h) :
nodelay(false)
{}
-void
-Socket::createSocket(const SocketAddress& sa) const
+void Socket::createSocket(const SocketAddress& sa) const
{
SOCKET& socket = impl->fd;
if (socket != INVALID_SOCKET) Socket::close();
@@ -168,24 +137,24 @@ Socket::createSocket(const SocketAddress& sa) const
if (nonblocking) setNonblocking();
if (nodelay) setTcpNoDelay();
} catch (std::exception&) {
- closesocket(s);
+ ::closesocket(s);
socket = INVALID_SOCKET;
throw;
}
}
-void Socket::setTimeout(const Duration& interval) const
-{
- const SOCKET& socket = impl->fd;
- int64_t nanosecs = interval;
- nanosecs /= (1000 * 1000); // nsecs -> usec -> msec
- int msec = 0;
- if (nanosecs > std::numeric_limits<int>::max())
- msec = std::numeric_limits<int>::max();
- else
- msec = static_cast<int>(nanosecs);
- setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec));
- setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec));
+Socket* Socket::createSameTypeSocket() const {
+ SOCKET& socket = impl->fd;
+ // Socket currently has no actual socket attached
+ if (socket == INVALID_SOCKET)
+ return new Socket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ return new Socket(new IOHandlePrivate(s));
}
void Socket::setNonblocking() const {
@@ -193,30 +162,25 @@ void Socket::setNonblocking() const {
QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
}
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
{
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
connect(sa);
}
void
Socket::connect(const SocketAddress& addr) const
{
+ peername = addr.asString(false);
+
+ createSocket(addr);
+
const SOCKET& socket = impl->fd;
- const addrinfo *addrs = &(getAddrInfo(addr));
- int error = 0;
+ int err;
WSASetLastError(0);
- while (addrs != 0) {
- if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
- (WSAGetLastError() == WSAEWOULDBLOCK))
- break;
- // Error... save this error code and see if there are other address
- // to try before throwing the exception.
- error = WSAGetLastError();
- addrs = addrs->ai_next;
- }
- if (error)
- throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
+ if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) &&
+ ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK))
+ throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername));
}
void
@@ -247,24 +211,26 @@ int Socket::read(void *buf, size_t count) const
return received;
}
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
+{
+ SocketAddress sa(host, port);
+ return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& addr, int backlog) const
{
+ createSocket(addr);
+
const SOCKET& socket = impl->fd;
BOOL yes=1;
QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
- struct sockaddr_in name;
- memset(&name, 0, sizeof(name));
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- name.sin_addr.s_addr = 0;
- if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+
+ if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
if (::listen(socket, backlog) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
-
- socklen_t namelen = sizeof(name);
- QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
- return ntohs(name.sin_port);
+ throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
+
+ return getLocalPort(socket);
}
Socket* Socket::accept() const
@@ -277,36 +243,20 @@ Socket* Socket::accept() const
else throw QPID_WINDOWS_ERROR(WSAGetLastError());
}
-std::string Socket::getSockname() const
-{
- return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
- return getName(impl->fd, false);
-}
-
std::string Socket::getPeerAddress() const
{
- if (!connectname.empty())
- return std::string (connectname);
- return getName(impl->fd, false, true);
+ if (peername.empty()) {
+ peername = getName(impl->fd, false);
+ }
+ return peername;
}
std::string Socket::getLocalAddress() const
{
- return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
- return atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
- return atoi(getService(impl->fd, true).c_str());
+ if (localname.empty()) {
+ localname = getName(impl->fd, true);
+ }
+ return localname;
}
int Socket::getError() const
diff --git a/cpp/src/qpid/sys/windows/SocketAddress.cpp b/cpp/src/qpid/sys/windows/SocketAddress.cpp
index 501cff1297..77bbf85810 100644
--- a/cpp/src/qpid/sys/windows/SocketAddress.cpp
+++ b/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -21,7 +21,13 @@
#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/windows/check.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
#include <winsock2.h>
#include <ws2tcpip.h>
@@ -35,37 +41,111 @@ SocketAddress::SocketAddress(const std::string& host0, const std::string& port0)
port(port0),
addrInfo(0)
{
- ::addrinfo hints;
- ::memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
- hints.ai_socktype = SOCK_STREAM;
-
- const char* node = 0;
- if (host.empty()) {
- hints.ai_flags |= AI_PASSIVE;
- } else {
- node = host.c_str();
- }
- const char* service = port.empty() ? "0" : port.c_str();
+}
- int n = ::getaddrinfo(node, service, &hints, &addrInfo);
- if (n != 0)
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+SocketAddress::SocketAddress(const SocketAddress& sa) :
+ host(sa.host),
+ port(sa.port),
+ addrInfo(0)
+{
+}
+
+SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
+{
+ SocketAddress temp(sa);
+
+ std::swap(temp, *this);
+ return *this;
}
SocketAddress::~SocketAddress()
{
- ::freeaddrinfo(addrInfo);
+ if (addrInfo) {
+ ::freeaddrinfo(addrInfo);
+ }
}
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
{
- return host + ":" + port;
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo(addr, addrlen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ std::string s;
+ switch (addr->sa_family) {
+ case AF_INET: s += dispName; break;
+ case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+ s += ":";
+ s += servName;
+ return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+ switch (addr->sa_family) {
+ case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+ case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
+std::string SocketAddress::asString(bool numeric) const
+{
+ if (!numeric)
+ return host + ":" + port;
+ // Canonicalise into numeric id
+ const ::addrinfo& ai = getAddrInfo(*this);
+
+ return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+ if (!currentAddrInfo) return;
+
+ ::addrinfo& ai = *currentAddrInfo;
+ switch (ai.ai_family) {
+ case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+ case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
}
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
{
- return *sa.addrInfo;
+ if (!sa.addrInfo) {
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (sa.host.empty()) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ node = sa.host.c_str();
+ }
+ const char* service = sa.port.empty() ? "0" : sa.port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
+ }
+
+ return *sa.currentAddrInfo;
}
}}
diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h
index 3cdf2c8f08..edec081ced 100644
--- a/cpp/src/qpid/sys/windows/SslAsynchIO.h
+++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h
@@ -39,9 +39,6 @@ namespace qpid {
namespace sys {
namespace windows {
-class Socket;
-class Poller;
-
/*
* SSL/Schannel shim between the frame-handling and AsynchIO layers.
* SslAsynchIO creates a regular AsynchIO object to handle I/O and this class
diff --git a/cpp/src/qpid/sys/windows/StrError.cpp b/cpp/src/qpid/sys/windows/StrError.cpp
index 9c1bfcd79c..546d399d16 100755
--- a/cpp/src/qpid/sys/windows/StrError.cpp
+++ b/cpp/src/qpid/sys/windows/StrError.cpp
@@ -30,6 +30,7 @@ namespace sys {
std::string strError(int err) {
const size_t bufsize = 512;
char buf[bufsize];
+ buf[0] = 0;
if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK
| FORMAT_MESSAGE_FROM_SYSTEM,
0,
@@ -39,7 +40,11 @@ std::string strError(int err) {
bufsize,
0))
{
- strerror_s (buf, bufsize, err);
+#ifdef _MSC_VER
+ strerror_s(buf, bufsize, err);
+#else
+ return std::string(strerror(err));
+#endif
}
return std::string(buf);
}
diff --git a/cpp/src/qpid/sys/windows/Thread.cpp b/cpp/src/qpid/sys/windows/Thread.cpp
index 583a9613a3..23b0033be4 100755
--- a/cpp/src/qpid/sys/windows/Thread.cpp
+++ b/cpp/src/qpid/sys/windows/Thread.cpp
@@ -19,6 +19,11 @@
*
*/
+// Ensure definition of OpenThread in mingw
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/windows/check.h"
@@ -26,50 +31,204 @@
#include <process.h>
#include <windows.h>
-namespace {
-unsigned __stdcall runRunnable(void* p)
-{
- static_cast<qpid::sys::Runnable*>(p)->run();
- _endthreadex(0);
- return 0;
-}
-}
+/*
+ * This implementation distinguishes between two types of thread: Qpid
+ * threads (based on qpid::sys::Runnable) and the rest. It provides a
+ * join() that will not deadlock against the Windows loader lock for
+ * Qpid threads.
+ *
+ * System thread identifiers are unique per Windows thread; thread
+ * handles are not. Thread identifiers can be recycled, but keeping a
+ * handle open against the thread prevents recycling as long as
+ * shared_ptr references to a ThreadPrivate structure remain.
+ *
+ * There is a 1-1 relationship between Qpid threads and their
+ * ThreadPrivate structure. Non-Qpid threads do not need to find the
+ * qpidThreadDone handle, so there may be a 1-many relationship for
+ * them.
+ *
+ * TLS storage is used for a lockless solution for static library
+ * builds. The special case of LoadLibrary/FreeLibrary requires
+ * additional synchronization variables and resource cleanup in
+ * DllMain. _DLL marks the dynamic case.
+ */
namespace qpid {
namespace sys {
class ThreadPrivate {
+public:
friend class Thread;
+ friend unsigned __stdcall runThreadPrivate(void*);
+ typedef boost::shared_ptr<ThreadPrivate> shared_ptr;
+ ~ThreadPrivate();
- HANDLE threadHandle;
+private:
unsigned threadId;
-
- ThreadPrivate(Runnable* runnable) {
- uintptr_t h = _beginthreadex(0,
- 0,
- runRunnable,
- runnable,
- 0,
- &threadId);
- QPID_WINDOWS_CHECK_CRT_NZ(h);
- threadHandle = reinterpret_cast<HANDLE>(h);
+ HANDLE threadHandle;
+ HANDLE initCompleted;
+ HANDLE qpidThreadDone;
+ Runnable* runnable;
+ shared_ptr keepAlive;
+
+ ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL),
+ qpidThreadDone(NULL), runnable(NULL) {
+ threadHandle = OpenThread (SYNCHRONIZE, FALSE, threadId);
+ QPID_WINDOWS_CHECK_CRT_NZ(threadHandle);
}
-
- ThreadPrivate()
- : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {}
+
+ ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL),
+ qpidThreadDone(NULL), runnable(r) {}
+
+ void start(shared_ptr& p);
+ static shared_ptr createThread(Runnable* r);
};
+}} // namespace qpid::sys
+
+
+namespace {
+using namespace qpid::sys;
+
+#ifdef _DLL
+class ScopedCriticalSection
+{
+ public:
+ ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); }
+ ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); }
+ private:
+ CRITICAL_SECTION& criticalSection;
+};
+
+CRITICAL_SECTION threadLock;
+long runningThreads = 0;
+HANDLE threadsDone;
+bool terminating = false;
+#endif
+
+
+DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES;
+
+DWORD getTlsIndex() {
+ if (tlsIndex != TLS_OUT_OF_INDEXES)
+ return tlsIndex; // already set
+
+ DWORD trialIndex = TlsAlloc();
+ QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource
+
+ // only one thread gets to set the value
+ DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES);
+ if (actualIndex == TLS_OUT_OF_INDEXES)
+ return trialIndex; // we won the race
+ else {
+ TlsFree(trialIndex);
+ return actualIndex;
+ }
+}
+
+} // namespace
+
+namespace qpid {
+namespace sys {
+
+unsigned __stdcall runThreadPrivate(void* p)
+{
+ ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p);
+ TlsSetValue(getTlsIndex(), threadPrivate);
+
+ WaitForSingleObject (threadPrivate->initCompleted, INFINITE);
+ CloseHandle (threadPrivate->initCompleted);
+ threadPrivate->initCompleted = NULL;
+
+ try {
+ threadPrivate->runnable->run();
+ } catch (...) {
+ // not our concern
+ }
+
+ SetEvent (threadPrivate->qpidThreadDone); // allow join()
+ threadPrivate->keepAlive.reset(); // may run ThreadPrivate destructor
+
+#ifdef _DLL
+ {
+ ScopedCriticalSection l(threadLock);
+ if (--runningThreads == 0)
+ SetEvent(threadsDone);
+ }
+#endif
+ return 0;
+}
+
+
+ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) {
+ ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable));
+ tp->start(tp);
+ return tp;
+}
+
+void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) {
+ getTlsIndex(); // fail here if OS problem, not in new thread
+
+ initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL);
+ QPID_WINDOWS_CHECK_CRT_NZ(initCompleted);
+ qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL);
+ QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone);
+
+#ifdef _DLL
+ {
+ ScopedCriticalSection l(threadLock);
+ if (terminating)
+ throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary"));
+ runningThreads++;
+ }
+#endif
+
+ uintptr_t h = _beginthreadex(0,
+ 0,
+ runThreadPrivate,
+ (void *)this,
+ 0,
+ &threadId);
+
+#ifdef _DLL
+ if (h == NULL) {
+ ScopedCriticalSection l(threadLock);
+ if (--runningThreads == 0)
+ SetEvent(threadsDone);
+ }
+#endif
+
+ QPID_WINDOWS_CHECK_CRT_NZ(h);
+
+ // Success
+ keepAlive = tp;
+ threadHandle = reinterpret_cast<HANDLE>(h);
+ SetEvent (initCompleted);
+}
+
+ThreadPrivate::~ThreadPrivate() {
+ if (threadHandle)
+ CloseHandle (threadHandle);
+ if (initCompleted)
+ CloseHandle (initCompleted);
+ if (qpidThreadDone)
+ CloseHandle (qpidThreadDone);
+}
+
+
Thread::Thread() {}
-Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {}
+Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {}
-Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {}
+Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {}
Thread::operator bool() {
return impl;
}
bool Thread::operator==(const Thread& t) const {
+ if (!impl || !t.impl)
+ return false;
return impl->threadId == t.impl->threadId;
}
@@ -79,10 +238,17 @@ bool Thread::operator!=(const Thread& t) const {
void Thread::join() {
if (impl) {
- DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE);
+ DWORD status;
+ if (impl->runnable) {
+ HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle};
+ // wait for either. threadHandle not signalled if loader
+ // lock held (FreeLibrary). qpidThreadDone not signalled
+ // if thread terminated by exit().
+ status = WaitForMultipleObjects (2, handles, false, INFINITE);
+ }
+ else
+ status = WaitForSingleObject (impl->threadHandle, INFINITE);
QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
- CloseHandle (impl->threadHandle);
- impl->threadHandle = 0;
}
}
@@ -92,9 +258,70 @@ unsigned long Thread::logId() {
/* static */
Thread Thread::current() {
+ ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex());
Thread t;
- t.impl.reset(new ThreadPrivate());
+ if (tlsValue != NULL) {
+ // called from within Runnable->run(), so keepAlive has positive use count
+ t.impl = tlsValue->keepAlive;
+ }
+ else
+ t.impl.reset(new ThreadPrivate());
return t;
}
-}} /* qpid::sys */
+}} // namespace qpid::sys
+
+
+#ifdef _DLL
+
+// DllMain: called possibly many times in a process lifetime if dll
+// loaded and freed repeatedly . Be mindful of Windows loader lock
+// and other DllMain restrictions.
+
+BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
+ switch (reason) {
+ case DLL_PROCESS_ATTACH:
+ InitializeCriticalSection(&threadLock);
+ threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL);
+ break;
+
+ case DLL_PROCESS_DETACH:
+ terminating = true;
+ if (reserved != NULL) {
+ // process exit(): threads are stopped arbitrarily and
+ // possibly in an inconsistent state. Not even threadLock
+ // can be trusted. All static destructors have been
+ // called at this point and any resources this unit knows
+ // about will be released as part of process tear down by
+ // the OS. Accordingly, do nothing.
+ return TRUE;
+ }
+ else {
+ // FreeLibrary(): threads are still running and we are
+ // encouraged to clean up to avoid leaks. Mostly we just
+ // want any straggler threads to finish and notify
+ // threadsDone as the last thing they do.
+ while (1) {
+ {
+ ScopedCriticalSection l(threadLock);
+ if (runningThreads == 0)
+ break;
+ ResetEvent(threadsDone);
+ }
+ WaitForSingleObject(threadsDone, INFINITE);
+ }
+ if (tlsIndex != TLS_OUT_OF_INDEXES)
+ TlsFree(getTlsIndex());
+ CloseHandle(threadsDone);
+ DeleteCriticalSection(&threadLock);
+ }
+ break;
+
+ case DLL_THREAD_ATTACH:
+ case DLL_THREAD_DETACH:
+ break;
+ }
+ return TRUE;
+}
+
+#endif
diff --git a/cpp/src/qpid/sys/windows/Time.cpp b/cpp/src/qpid/sys/windows/Time.cpp
index 16d09fcdc0..25c50819cd 100644
--- a/cpp/src/qpid/sys/windows/Time.cpp
+++ b/cpp/src/qpid/sys/windows/Time.cpp
@@ -27,6 +27,17 @@
using namespace boost::posix_time;
+namespace {
+
+// High-res timing support. This will display times since program start,
+// more or less. Keep track of the start value and the conversion factor to
+// seconds.
+bool timeInitialized = false;
+LARGE_INTEGER start;
+double freq = 1.0;
+
+}
+
namespace qpid {
namespace sys {
@@ -91,10 +102,35 @@ void outputFormattedNow(std::ostream& o) {
char time_string[100];
::time( &rawtime );
+#ifdef _MSC_VER
::localtime_s(&timeinfo, &rawtime);
+#else
+ timeinfo = *(::localtime(&rawtime));
+#endif
::strftime(time_string, 100,
"%Y-%m-%d %H:%M:%S",
&timeinfo);
o << time_string << " ";
}
+
+void outputHiresNow(std::ostream& o) {
+ if (!timeInitialized) {
+ start.QuadPart = 0;
+ LARGE_INTEGER iFreq;
+ iFreq.QuadPart = 1;
+ QueryPerformanceCounter(&start);
+ QueryPerformanceFrequency(&iFreq);
+ freq = static_cast<double>(iFreq.QuadPart);
+ timeInitialized = true;
+ }
+ LARGE_INTEGER iNow;
+ iNow.QuadPart = 0;
+ QueryPerformanceCounter(&iNow);
+ iNow.QuadPart -= start.QuadPart;
+ if (iNow.QuadPart < 0)
+ iNow.QuadPart = 0;
+ double now = static_cast<double>(iNow.QuadPart);
+ now /= freq; // now is seconds after this
+ o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s ";
+}
}}
diff --git a/cpp/src/qpid/sys/windows/mingw32_compat.h b/cpp/src/qpid/sys/windows/mingw32_compat.h
new file mode 100644
index 0000000000..51f613cc25
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/mingw32_compat.h
@@ -0,0 +1,39 @@
+#ifndef _sys_windows_mingw32_compat
+#define _sys_windows_mingw32_compat
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef WIN32
+#ifndef _MSC_VER
+
+//
+// The following definitions for extension function GUIDs and signatures are taken from
+// MswSock.h in the Windows32 SDK. These rightfully belong in the mingw32 version of
+// mswsock.h, but are not included presently.
+//
+
+#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
+typedef BOOL (PASCAL *LPFN_ACCEPTEX)(SOCKET,SOCKET,PVOID,DWORD,DWORD,DWORD,LPDWORD,LPOVERLAPPED);
+
+#endif
+#endif
+
+#endif
diff --git a/cpp/src/qpid/sys/windows/uuid.cpp b/cpp/src/qpid/sys/windows/uuid.cpp
index b5360622dc..3316ecbc00 100644
--- a/cpp/src/qpid/sys/windows/uuid.cpp
+++ b/cpp/src/qpid/sys/windows/uuid.cpp
@@ -19,7 +19,7 @@
*
*/
-#include <Rpc.h>
+#include <rpc.h>
#ifdef uuid_t /* Done in rpcdce.h */
# undef uuid_t
#endif
@@ -52,7 +52,11 @@ int uuid_parse (const char *in, uuid_t uu) {
void uuid_unparse (const uuid_t uu, char *out) {
unsigned char *formatted;
if (UuidToString((UUID*)uu, &formatted) == RPC_S_OK) {
+#ifdef _MSC_VER
strncpy_s (out, 36+1, (char*)formatted, _TRUNCATE);
+#else
+ strncpy (out, (char*)formatted, 36+1);
+#endif
RpcStringFree(&formatted);
}
}