From 18fd44d23d475bb9e928cd1371cd5c9b10ef8120 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Wed, 24 Oct 2012 05:51:31 +0000 Subject: QPID-4272: Large amounts of code are duplicated between the SSL and TCP transports Lift Socket into an interface with concrete implementations - BSDSocket, WinSocket and SslSocket - As a side effect completely change the approach we use for platform specific handles: IOHandle now directly carries the platform handle but its real type is only exposed to platform specific code. - Modified RDMA code for the new IOHandle approach git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1401559 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/CMakeLists.txt | 4 +- qpid/cpp/src/Makefile.am | 6 +- .../src/qpid/broker/windows/SslProtocolFactory.cpp | 8 +- qpid/cpp/src/qpid/client/TCPConnector.cpp | 13 +- qpid/cpp/src/qpid/client/TCPConnector.h | 4 +- qpid/cpp/src/qpid/sys/Socket.h | 54 ++-- qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp | 6 +- qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp | 9 +- qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp | 255 +++++++++++++++++++ qpid/cpp/src/qpid/sys/posix/BSDSocket.h | 109 ++++++++ qpid/cpp/src/qpid/sys/posix/IOHandle.cpp | 15 -- qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp | 7 +- qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp | 8 +- qpid/cpp/src/qpid/sys/posix/Socket.cpp | 251 ------------------ qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 22 +- qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h | 11 +- qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp | 32 +-- qpid/cpp/src/qpid/sys/ssl/SslSocket.h | 10 +- qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 39 ++- qpid/cpp/src/qpid/sys/windows/IOHandle.cpp | 13 - qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h | 17 +- qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp | 4 +- .../cpp/src/qpid/sys/windows/PollableCondition.cpp | 3 +- qpid/cpp/src/qpid/sys/windows/Socket.cpp | 282 --------------------- qpid/cpp/src/qpid/sys/windows/WinSocket.cpp | 261 +++++++++++++++++++ qpid/cpp/src/qpid/sys/windows/WinSocket.h | 114 +++++++++ qpid/cpp/src/tests/DispatcherTest.cpp | 5 +- qpid/cpp/src/tests/PollerTest.cpp | 9 +- 28 files changed, 881 insertions(+), 690 deletions(-) create mode 100644 qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp create mode 100644 qpid/cpp/src/qpid/sys/posix/BSDSocket.h delete mode 100644 qpid/cpp/src/qpid/sys/posix/Socket.cpp delete mode 100644 qpid/cpp/src/qpid/sys/windows/Socket.cpp create mode 100644 qpid/cpp/src/qpid/sys/windows/WinSocket.cpp create mode 100644 qpid/cpp/src/qpid/sys/windows/WinSocket.h (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index ce94c0a318..20868da674 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -748,7 +748,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/windows/PipeHandle.cpp qpid/sys/windows/PollableCondition.cpp qpid/sys/windows/Shlib.cpp - qpid/sys/windows/Socket.cpp + qpid/sys/windows/WinSocket.cpp qpid/sys/windows/SocketAddress.cpp qpid/sys/windows/StrError.cpp qpid/sys/windows/SystemInfo.cpp @@ -853,7 +853,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/posix/PollableCondition.cpp qpid/sys/posix/Shlib.cpp qpid/log/posix/SinkOptions.cpp - qpid/sys/posix/Socket.cpp + qpid/sys/posix/BSDSocket.cpp qpid/sys/posix/SocketAddress.cpp qpid/sys/posix/StrError.cpp qpid/sys/posix/Thread.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 8149ea218b..488eb37eb0 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -46,7 +46,6 @@ windows_dist = \ qpid/sys/windows/QpidDllMain.h \ qpid/sys/windows/Shlib.cpp \ qpid/sys/windows/SocketAddress.cpp \ - qpid/sys/windows/Socket.cpp \ qpid/sys/windows/SslAsynchIO.cpp \ qpid/sys/windows/SslAsynchIO.h \ qpid/sys/windows/StrError.cpp \ @@ -56,6 +55,8 @@ windows_dist = \ ../include/qpid/sys/windows/Time.h \ qpid/sys/windows/uuid.cpp \ qpid/sys/windows/uuid.h \ + qpid/sys/windows/WinSocket.cpp \ + qpid/sys/windows/WinSocket.h \ windows/QpiddBroker.cpp \ windows/SCM.h \ windows/SCM.cpp \ @@ -158,7 +159,8 @@ qpidd_SOURCES = qpidd.cpp qpidd.h $(posix_qpidd_src) libqpidcommon_la_SOURCES += \ qpid/log/posix/SinkOptions.cpp \ qpid/sys/posix/IOHandle.cpp \ - qpid/sys/posix/Socket.cpp \ + qpid/sys/posix/BSDSocket.cpp \ + qpid/sys/posix/BSDSocket.h \ qpid/sys/posix/SocketAddress.cpp \ qpid/sys/posix/AsynchIO.cpp \ qpid/sys/posix/FileSysDir.cpp \ diff --git a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index ff177ba499..5b801aa69f 100644 --- a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -158,7 +158,7 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, // Get the certificate for this server. DWORD flags = 0; std::string certStoreLocation = options.certStoreLocation; - std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower); + std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower); if (certStoreLocation == "currentuser") { flags = CERT_SYSTEM_STORE_CURRENT_USER; } else if (certStoreLocation == "localmachine") { @@ -217,14 +217,14 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, // We must have at least one resolved address QPID_LOG(info, "SSL Listening to: " << sa.asString()) - Socket* s = new Socket; + Socket* s = createSocket(); listeningPort = s->listen(sa, backlog); listeners.push_back(s); // Try any other resolved addresses while (sa.nextAddress()) { QPID_LOG(info, "SSL Listening to: " << sa.asString()) - Socket* s = new Socket; + Socket* s = createSocket(); s->listen(sa, backlog); listeners.push_back(s); } @@ -325,7 +325,7 @@ void SslProtocolFactory::connect(sys::Poller::shared_ptr poller, // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. - qpid::sys::Socket* socket = new qpid::sys::Socket(); + qpid::sys::Socket* socket = createSocket(); connectFailedCallback = failed; AsynchConnector::create(*socket, host, diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp index a14acb214c..b92f342b74 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.cpp +++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp @@ -72,12 +72,13 @@ TCPConnector::TCPConnector(Poller::shared_ptr p, closed(true), shutdownHandler(0), input(0), + socket(createSocket()), connector(0), aio(0), poller(p) { QPID_LOG(debug, "TCPConnector created for " << version); - settings.configureSocket(socket); + settings.configureSocket(*socket); } TCPConnector::~TCPConnector() { @@ -88,7 +89,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); connector = AsynchConnector::create( - socket, + *socket, host, port, boost::bind(&TCPConnector::connected, this, _1), boost::bind(&TCPConnector::connectFailed, this, _3)); @@ -99,7 +100,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) { void TCPConnector::connected(const Socket&) { connector = 0; - aio = AsynchIO::create(socket, + aio = AsynchIO::create(*socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), boost::bind(&TCPConnector::disconnected, this, _1), @@ -116,7 +117,7 @@ void TCPConnector::start(sys::AsynchIO* aio_) { aio->createBuffers(maxFrameSize); - identifier = str(format("[%1%]") % socket.getFullAddress()); + identifier = str(format("[%1%]") % socket->getFullAddress()); } void TCPConnector::initAmqp() { @@ -127,7 +128,7 @@ void TCPConnector::initAmqp() { void TCPConnector::connectFailed(const std::string& msg) { connector = 0; QPID_LOG(warning, "Connect failed: " << msg); - socket.close(); + socket->close(); if (!closed) closed = true; if (shutdownHandler) @@ -318,7 +319,7 @@ void TCPConnector::eof(AsynchIO&) { void TCPConnector::disconnected(AsynchIO&) { close(); - socketClosed(*aio, socket); + socketClosed(*aio, *socket); } void TCPConnector::activateSecurityLayer(std::auto_ptr sl) diff --git a/qpid/cpp/src/qpid/client/TCPConnector.h b/qpid/cpp/src/qpid/client/TCPConnector.h index 5e1a3856e6..a90dffd3ef 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.h +++ b/qpid/cpp/src/qpid/client/TCPConnector.h @@ -35,7 +35,7 @@ #include "qpid/sys/Thread.h" #include -#include +#include #include #include @@ -66,7 +66,7 @@ class TCPConnector : public Connector, public sys::Codec sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; - sys::Socket socket; + boost::scoped_ptr socket; sys::AsynchConnector* connector; sys::AsynchIO* aio; diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index aa8a8a31d9..2119566d99 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -22,7 +22,6 @@ * */ -#include "qpid/sys/IOHandle.h" #include "qpid/sys/IntegerTypes.h" #include "qpid/CommonImportExport.h" #include @@ -31,47 +30,42 @@ namespace qpid { namespace sys { class Duration; +class IOHandle; class SocketAddress; -namespace ssl { -class SslMuxSocket; -} - -class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle +class Socket { public: - /** Create a socket wrapper for descriptor. */ - QPID_COMMON_EXTERN Socket(); + virtual ~Socket() {}; - /** Create a new Socket which is the same address family as this one */ - QPID_COMMON_EXTERN Socket* createSameTypeSocket() const; + virtual operator const IOHandle&() const = 0; /** Set socket non blocking */ - void setNonblocking() const; + virtual void setNonblocking() const = 0; - QPID_COMMON_EXTERN void setTcpNoDelay() const; + virtual void setTcpNoDelay() const = 0; - QPID_COMMON_EXTERN void connect(const SocketAddress&) const; + virtual void connect(const SocketAddress&) const = 0; - QPID_COMMON_EXTERN void close() const; + virtual void close() const = 0; /** Bind to a port and start listening. *@param port 0 means choose an available port. *@param backlog maximum number of pending connections. *@return The bound port. */ - QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const; + virtual int listen(const SocketAddress&, int backlog = 10) const = 0; /** * Returns an address (host and port) for the remote end of the * socket */ - QPID_COMMON_EXTERN std::string getPeerAddress() const; + virtual std::string getPeerAddress() const = 0; /** * Returns an address (host and port) for the local end of the * socket */ - QPID_COMMON_EXTERN std::string getLocalAddress() const; + virtual std::string getLocalAddress() const = 0; /** * Returns the full address of the connection: local and remote host and port. @@ -82,30 +76,20 @@ public: * Returns the error code stored in the socket. This may be used * to determine the result of a non-blocking connect. */ - QPID_COMMON_EXTERN int getError() const; + virtual int getError() const = 0; /** Accept a connection from a socket that is already listening * and has an incoming connection */ - QPID_COMMON_EXTERN Socket* accept() const; - - // TODO The following are raw operations, maybe they need better wrapping? - QPID_COMMON_EXTERN int read(void *buf, size_t count) const; - QPID_COMMON_EXTERN int write(const void *buf, size_t count) const; - -protected: - /** Create socket */ - void createSocket(const SocketAddress&) const; + virtual Socket* accept() const = 0; - mutable std::string localname; - mutable std::string peername; - mutable bool nonblocking; - mutable bool nodelay; - - /** Construct socket with existing handle */ - Socket(IOHandlePrivate*); - friend class qpid::sys::ssl::SslMuxSocket; + virtual int read(void *buf, size_t count) const = 0; + virtual int write(const void *buf, size_t count) const = 0; }; +/** Make the default socket for whatever platform we are executing on + */ +QPID_COMMON_EXTERN Socket* createSocket(); + }} #endif /*!_sys_Socket_h*/ diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index ed7cc3748d..2ff47e982c 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -126,7 +126,7 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const // We must have at least one resolved address QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; + Socket* s = createSocket(); uint16_t lport = s->listen(sa, backlog); QPID_LOG(debug, "Listened to: " << lport); listeners.push_back(s); @@ -138,7 +138,7 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const // Hack to ensure that all listening connections are on the same port sa.setAddrInfoPort(listeningPort); QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; + Socket* s = createSocket(); uint16_t lport = s->listen(sa, backlog); QPID_LOG(debug, "Listened to: " << lport); listeners.push_back(s); @@ -204,7 +204,7 @@ void AsynchIOProtocolFactory::connect( // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. - Socket* socket = new Socket(); + Socket* socket = createSocket(); try { AsynchConnector* c = AsynchConnector::create( *socket, diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp index c23403c66d..3769a11f7d 100644 --- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -20,7 +20,6 @@ */ #include "qpid/sys/Poller.h" -#include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/AtomicCount.h" #include "qpid/sys/DeletionManager.h" @@ -64,12 +63,12 @@ class PollerHandlePrivate { }; ::__uint32_t events; - const IOHandlePrivate* ioHandle; + const IOHandle* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : + PollerHandlePrivate(const IOHandle* h, PollerHandle* p) : events(0), ioHandle(h), pollerHandle(p), @@ -77,7 +76,7 @@ class PollerHandlePrivate { } int fd() const { - return toFd(ioHandle); + return ioHandle->fd; } bool isActive() const { @@ -138,7 +137,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(h.impl, this)) + impl(new PollerHandlePrivate(&h, this)) {} PollerHandle::~PollerHandle() { diff --git a/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp b/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp new file mode 100644 index 0000000000..265142f629 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp @@ -0,0 +1,255 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/posix/BSDSocket.h" + +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace qpid { +namespace sys { + +namespace { +std::string getName(int fd, bool local) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + if (local) { + QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + } else { + QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) ); + } + + return SocketAddress::asString(name, namelen); +} + +uint16_t getLocalPort(int fd) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + + return SocketAddress::getPort(name); +} +} + +BSDSocket::BSDSocket() : + fd(-1), + handle(new IOHandle), + nonblocking(false), + nodelay(false) +{} + +Socket* createSocket() +{ + return new BSDSocket; +} + +BSDSocket::BSDSocket(int fd0) : + fd(fd0), + handle(new IOHandle(fd)), + nonblocking(false), + nodelay(false) +{} + +BSDSocket::~BSDSocket() +{} + +BSDSocket::operator const IOHandle&() const +{ + return *handle; +} + +void BSDSocket::createSocket(const SocketAddress& sa) const +{ + int& socket = fd; + if (socket != -1) BSDSocket::close(); + int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0); + if (s < 0) throw QPID_POSIX_ERROR(errno); + socket = s; + *handle = IOHandle(s); + + try { + if (nonblocking) setNonblocking(); + if (nodelay) setTcpNoDelay(); + if (getAddrInfo(sa).ai_family == AF_INET6) { + int flag = 1; + int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + } + } catch (std::exception&) { + ::close(s); + socket = -1; + *handle = IOHandle(); + throw; + } +} + +void BSDSocket::setNonblocking() const { + int& socket = fd; + nonblocking = true; + if (socket != -1) { + QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK)); + } +} + +void BSDSocket::setTcpNoDelay() const +{ + int& socket = fd; + nodelay = true; + if (socket != -1) { + int flag = 1; + int result = ::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + } +} + +void BSDSocket::connect(const SocketAddress& addr) const +{ + // The display name for an outbound connection needs to be the name that was specified + // for the address rather than a resolved IP address as we don't know which of + // the IP addresses is actually the one that will be connected to. + peername = addr.asString(false); + + // However the string we compare with the local port must be numeric or it might not + // match when it should as getLocalAddress() will always be numeric + std::string connectname = addr.asString(); + + createSocket(addr); + + const int& socket = fd; + // TODO the correct thing to do here is loop on failure until you've used all the returned addresses + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && + (errno != EINPROGRESS)) { + throw Exception(QPID_MSG(strError(errno) << ": " << peername)); + } + // When connecting to a port on the same host which no longer has + // a process associated with it, the OS occasionally chooses the + // remote port (which is unoccupied) as the port to bind the local + // end of the socket, resulting in a "circular" connection. + // + // This seems like something the OS should prevent but I have + // confirmed that sporadic hangs in + // cluster_tests.LongTests.test_failover on RHEL5 are caused by + // such a circular connection. + // + // Raise an error if we see such a connection, since we know there is + // no listener on the peer address. + // + if (getLocalAddress() == connectname) { + close(); + throw Exception(QPID_MSG("Connection refused: " << peername)); + } +} + +void +BSDSocket::close() const +{ + int& socket = fd; + if (socket == -1) return; + if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); + socket = -1; + *handle = IOHandle(); +} + +int BSDSocket::listen(const SocketAddress& sa, int backlog) const +{ + createSocket(sa); + + const int& socket = fd; + int yes=1; + QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); + + if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) + throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); + if (::listen(socket, backlog) < 0) + throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); + + return getLocalPort(socket); +} + +Socket* BSDSocket::accept() const +{ + int afd = ::accept(fd, 0, 0); + if ( afd >= 0) { + BSDSocket* s = new BSDSocket(afd); + s->localname = localname; + return s; + } + else if (errno == EAGAIN) + return 0; + else throw QPID_POSIX_ERROR(errno); +} + +int BSDSocket::read(void *buf, size_t count) const +{ + return ::read(fd, buf, count); +} + +int BSDSocket::write(const void *buf, size_t count) const +{ + return ::write(fd, buf, count); +} + +std::string BSDSocket::getPeerAddress() const +{ + if (peername.empty()) { + peername = getName(fd, false); + } + return peername; +} + +std::string BSDSocket::getLocalAddress() const +{ + if (localname.empty()) { + localname = getName(fd, true); + } + return localname; +} + +int BSDSocket::getError() const +{ + int result; + socklen_t rSize = sizeof (result); + + if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0) + throw QPID_POSIX_ERROR(errno); + + return result; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/BSDSocket.h b/qpid/cpp/src/qpid/sys/posix/BSDSocket.h new file mode 100644 index 0000000000..98d7eb6e4d --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/BSDSocket.h @@ -0,0 +1,109 @@ +#ifndef QPID_SYS_BSDSOCKET_H +#define QPID_SYS_BSDSOCKET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include + +#include + +namespace qpid { +namespace sys { + +class Duration; +class IOHandle; +class SocketAddress; + +namespace ssl { +class SslMuxSocket; +} + +class QPID_COMMON_CLASS_EXTERN BSDSocket : public Socket +{ +public: + /** Create a socket wrapper for descriptor. */ + QPID_COMMON_EXTERN BSDSocket(); + QPID_COMMON_EXTERN ~BSDSocket(); + + QPID_COMMON_EXTERN operator const IOHandle&() const; + + /** Set socket non blocking */ + QPID_COMMON_EXTERN virtual void setNonblocking() const; + + QPID_COMMON_EXTERN virtual void setTcpNoDelay() const; + + QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const; + + QPID_COMMON_EXTERN virtual void close() const; + + /** Bind to a port and start listening. + *@return The bound port number + */ + QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const; + + /** + * Returns an address (host and port) for the remote end of the + * socket + */ + QPID_COMMON_EXTERN std::string getPeerAddress() const; + /** + * Returns an address (host and port) for the local end of the + * socket + */ + QPID_COMMON_EXTERN std::string getLocalAddress() const; + + /** + * Returns the error code stored in the socket. This may be used + * to determine the result of a non-blocking connect. + */ + QPID_COMMON_EXTERN int getError() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + QPID_COMMON_EXTERN virtual Socket* accept() const; + + // TODO The following are raw operations, maybe they need better wrapping? + QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const; + QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const; + +protected: + /** Create socket */ + void createSocket(const SocketAddress&) const; + + mutable int fd; + mutable boost::scoped_ptr handle; + mutable std::string localname; + mutable std::string peername; + mutable bool nonblocking; + mutable bool nodelay; + + /** Construct socket with existing handle */ + BSDSocket(int fd); + friend class qpid::sys::ssl::SslMuxSocket; // Needed for this constructor +}; + +}} +#endif /*!QPID_SYS_BSDSOCKET_H*/ diff --git a/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp index 9c049ee1de..d3f502a63c 100644 --- a/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp +++ b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -19,26 +19,11 @@ * */ -#include "qpid/sys/IOHandle.h" - #include "qpid/sys/posix/PrivatePosix.h" namespace qpid { namespace sys { -int toFd(const IOHandlePrivate* h) -{ - return h->fd; -} - NullIOHandle DummyIOHandle; -IOHandle::IOHandle(IOHandlePrivate* h) : - impl(h) -{} - -IOHandle::~IOHandle() { - delete impl; -} - }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp index abff8a5be8..aa129faf20 100644 --- a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -21,7 +21,6 @@ #include "qpid/sys/PollableCondition.h" #include "qpid/sys/DispatchHandle.h" -#include "qpid/sys/IOHandle.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/Exception.h" @@ -58,14 +57,14 @@ PollableConditionPrivate::PollableConditionPrivate( const sys::PollableCondition::Callback& cb, sys::PollableCondition& parent, const boost::shared_ptr& poller -) : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent) +) : cb(cb), parent(parent) { int fds[2]; if (::pipe(fds) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); - impl->fd = fds[0]; + fd = fds[0]; writeFd = fds[1]; - if (::fcntl(impl->fd, F_SETFL, O_NONBLOCK) == -1) + if (::fcntl(fd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); diff --git a/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp b/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp index eb0c3384d1..ae839b2e20 100644 --- a/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp +++ b/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp @@ -88,12 +88,12 @@ class PollerHandlePrivate { }; short events; - const IOHandlePrivate* ioHandle; + const IOHandle* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : + PollerHandlePrivate(const IOHandle* h, PollerHandle* p) : events(0), ioHandle(h), pollerHandle(p), @@ -101,7 +101,7 @@ class PollerHandlePrivate { } int fd() const { - return toFd(ioHandle); + return ioHandle->fd; } bool isActive() const { @@ -162,7 +162,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(h.impl, this)) + impl(new PollerHandlePrivate(&h, this)) {} PollerHandle::~PollerHandle() { diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp deleted file mode 100644 index 0c01374369..0000000000 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ /dev/null @@ -1,251 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Socket.h" - -#include "qpid/sys/SocketAddress.h" -#include "qpid/sys/posix/check.h" -#include "qpid/sys/posix/PrivatePosix.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace qpid { -namespace sys { - -namespace { -std::string getName(int fd, bool local) -{ - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - - if (local) { - QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); - } else { - QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) ); - } - - return SocketAddress::asString(name, namelen); -} - -uint16_t getLocalPort(int fd) -{ - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - - QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); - - return SocketAddress::getPort(name); -} -} - -Socket::Socket() : - IOHandle(new IOHandlePrivate), - nonblocking(false), - nodelay(false) -{} - -Socket::Socket(IOHandlePrivate* h) : - IOHandle(h), - nonblocking(false), - nodelay(false) -{} - -void Socket::createSocket(const SocketAddress& sa) const -{ - int& socket = impl->fd; - if (socket != -1) Socket::close(); - int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0); - if (s < 0) throw QPID_POSIX_ERROR(errno); - socket = s; - - try { - if (nonblocking) setNonblocking(); - if (nodelay) setTcpNoDelay(); - if (getAddrInfo(sa).ai_family == AF_INET6) { - int flag = 1; - int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); - QPID_POSIX_CHECK(result); - } - } catch (std::exception&) { - ::close(s); - socket = -1; - throw; - } -} - -Socket* Socket::createSameTypeSocket() const { - int& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == -1) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s < 0) throw QPID_POSIX_ERROR(errno); - return new Socket(new IOHandlePrivate(s)); -} - -void Socket::setNonblocking() const { - int& socket = impl->fd; - nonblocking = true; - if (socket != -1) { - QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK)); - } -} - -void Socket::setTcpNoDelay() const -{ - int& socket = impl->fd; - nodelay = true; - if (socket != -1) { - int flag = 1; - int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); - QPID_POSIX_CHECK(result); - } -} - -void Socket::connect(const SocketAddress& addr) const -{ - // The display name for an outbound connection needs to be the name that was specified - // for the address rather than a resolved IP address as we don't know which of - // the IP addresses is actually the one that will be connected to. - peername = addr.asString(false); - - // However the string we compare with the local port must be numeric or it might not - // match when it should as getLocalAddress() will always be numeric - std::string connectname = addr.asString(); - - createSocket(addr); - - const int& socket = impl->fd; - // TODO the correct thing to do here is loop on failure until you've used all the returned addresses - if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && - (errno != EINPROGRESS)) { - throw Exception(QPID_MSG(strError(errno) << ": " << peername)); - } - // When connecting to a port on the same host which no longer has - // a process associated with it, the OS occasionally chooses the - // remote port (which is unoccupied) as the port to bind the local - // end of the socket, resulting in a "circular" connection. - // - // This seems like something the OS should prevent but I have - // confirmed that sporadic hangs in - // cluster_tests.LongTests.test_failover on RHEL5 are caused by - // such a circular connection. - // - // Raise an error if we see such a connection, since we know there is - // no listener on the peer address. - // - if (getLocalAddress() == connectname) { - close(); - throw Exception(QPID_MSG("Connection refused: " << peername)); - } -} - -void -Socket::close() const -{ - int& socket = impl->fd; - if (socket == -1) return; - if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); - socket = -1; -} - -int Socket::listen(const SocketAddress& sa, int backlog) const -{ - createSocket(sa); - - const int& socket = impl->fd; - int yes=1; - QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); - - if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) - throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); - if (::listen(socket, backlog) < 0) - throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); - - return getLocalPort(socket); -} - -Socket* Socket::accept() const -{ - int afd = ::accept(impl->fd, 0, 0); - if ( afd >= 0) { - Socket* s = new Socket(new IOHandlePrivate(afd)); - s->localname = localname; - return s; - } - else if (errno == EAGAIN) - return 0; - else throw QPID_POSIX_ERROR(errno); -} - -int Socket::read(void *buf, size_t count) const -{ - return ::read(impl->fd, buf, count); -} - -int Socket::write(const void *buf, size_t count) const -{ - return ::write(impl->fd, buf, count); -} - -std::string Socket::getPeerAddress() const -{ - if (peername.empty()) { - peername = getName(impl->fd, false); - } - return peername; -} - -std::string Socket::getLocalAddress() const -{ - if (localname.empty()) { - localname = getName(impl->fd, true); - } - return localname; -} - -int Socket::getError() const -{ - int result; - socklen_t rSize = sizeof (result); - - if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0) - throw QPID_POSIX_ERROR(errno); - - return result; -} - -}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index efe454c5be..889ee9ff75 100644 --- a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -105,7 +105,7 @@ namespace Rdma { } QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + handle(new qpid::sys::IOHandle), pd(allocPd(i->verbs)), cchannel(mkCChannel(i->verbs)), scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), @@ -113,7 +113,7 @@ namespace Rdma { outstandingSendEvents(0), outstandingRecvEvents(0) { - impl->fd = cchannel->fd; + handle->fd = cchannel->fd; // Set cq context to this QueuePair object so we can find // ourselves again @@ -163,6 +163,11 @@ namespace Rdma { // The buffers vectors automatically deletes all the buffers we've allocated } + QueuePair::operator qpid::sys::IOHandle&() const + { + return *handle; + } + // Create buffers to use for writing void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved) { @@ -359,11 +364,11 @@ namespace Rdma { // Wrap the passed in rdma_cm_id with a Connection // this basically happens only on connection request Connection::Connection(::rdma_cm_id* i) : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + handle(new qpid::sys::IOHandle), id(mkId(i)), context(0) { - impl->fd = id->channel->fd; + handle->fd = id->channel->fd; // Just overwrite the previous context as it will // have come from the listening connection @@ -372,12 +377,12 @@ namespace Rdma { } Connection::Connection() : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + handle(new qpid::sys::IOHandle), channel(mkEChannel()), id(mkId(channel.get(), this, RDMA_PS_TCP)), context(0) { - impl->fd = channel->fd; + handle->fd = channel->fd; } Connection::~Connection() { @@ -385,6 +390,11 @@ namespace Rdma { id->context = 0; } + Connection::operator qpid::sys::IOHandle&() const + { + return *handle; + } + void Connection::ensureQueuePair() { assert(id.get()); diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h index 8e3429027b..5f84793a5b 100644 --- a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -28,6 +28,7 @@ #include "qpid/sys/Mutex.h" #include +#include #include #include @@ -116,9 +117,10 @@ namespace Rdma { // Wrapper for a queue pair - this has the functionality for // putting buffers on the receive queue and for sending buffers // to the other end of the connection. - class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { + class QueuePair : public qpid::RefCounted { friend class Connection; + boost::scoped_ptr< qpid::sys::IOHandle > handle; boost::shared_ptr< ::ibv_pd > pd; boost::shared_ptr< ::ibv_mr > smr; boost::shared_ptr< ::ibv_mr > rmr; @@ -139,6 +141,8 @@ namespace Rdma { public: typedef boost::intrusive_ptr intrusive_ptr; + operator qpid::sys::IOHandle&() const; + // Create a buffers to use for writing void createSendBuffers(int sendBufferCount, int dataSize, int headerSize); @@ -195,7 +199,8 @@ namespace Rdma { // registered buffers can't be shared between different connections // (this can only happen between connections on the same controller in any case, // so needs careful management if used) - class Connection : public qpid::sys::IOHandle, public qpid::RefCounted { + class Connection : public qpid::RefCounted { + boost::scoped_ptr< qpid::sys::IOHandle > handle; boost::shared_ptr< ::rdma_event_channel > channel; boost::shared_ptr< ::rdma_cm_id > id; QueuePair::intrusive_ptr qp; @@ -216,6 +221,8 @@ namespace Rdma { public: typedef boost::intrusive_ptr intrusive_ptr; + operator qpid::sys::IOHandle&() const; + static intrusive_ptr make(); static intrusive_ptr find(::rdma_cm_id* i); diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp index 6b6f326492..22f9f63fff 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.cpp @@ -98,16 +98,16 @@ SslSocket::SslSocket(const std::string& certName, bool clientAuth) : * returned from accept. Because we use posix accept rather than * PR_Accept, we have to reset the handshake. */ -SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), nssSocket(0), prototype(0) +SslSocket::SslSocket(int fd, PRFileDesc* model) : BSDSocket(fd), nssSocket(0), prototype(0) { - nssSocket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd)); + nssSocket = SSL_ImportFD(model, PR_ImportTCPSocket(fd)); NSS_CHECK(SSL_ResetHandshake(nssSocket, PR_TRUE)); } void SslSocket::setNonblocking() const { if (!nssSocket) { - Socket::setNonblocking(); + BSDSocket::setNonblocking(); return; } PRSocketOptionData option; @@ -119,7 +119,7 @@ void SslSocket::setNonblocking() const void SslSocket::setTcpNoDelay() const { if (!nssSocket) { - Socket::setTcpNoDelay(); + BSDSocket::setTcpNoDelay(); return; } PRSocketOptionData option; @@ -130,9 +130,9 @@ void SslSocket::setTcpNoDelay() const void SslSocket::connect(const SocketAddress& addr) const { - Socket::connect(addr); + BSDSocket::connect(addr); - nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(impl->fd)); + nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(fd)); void* arg; // Use the connection's cert-name if it has one; else use global cert-name @@ -155,12 +155,12 @@ void SslSocket::connect(const SocketAddress& addr) const void SslSocket::close() const { if (!nssSocket) { - Socket::close(); + BSDSocket::close(); return; } - if (impl->fd > 0) { + if (fd > 0) { PR_Close(nssSocket); - impl->fd = -1; + fd = -1; } } @@ -176,15 +176,15 @@ int SslSocket::listen(const SocketAddress& sa, int backlog) const SECKEY_DestroyPrivateKey(key); CERT_DestroyCertificate(cert); - return Socket::listen(sa, backlog); + return BSDSocket::listen(sa, backlog); } -SslSocket* SslSocket::accept() const +Socket* SslSocket::accept() const { QPID_LOG(trace, "Accepting SSL connection."); - int afd = ::accept(impl->fd, 0, 0); + int afd = ::accept(fd, 0, 0); if ( afd >= 0) { - return new SslSocket(new IOHandlePrivate(afd), prototype); + return new SslSocket(afd, prototype); } else if (errno == EAGAIN) { return 0; } else { @@ -275,15 +275,15 @@ SslMuxSocket::SslMuxSocket(const std::string& certName, bool clientAuth) : Socket* SslMuxSocket::accept() const { - int afd = ::accept(impl->fd, 0, 0); + int afd = ::accept(fd, 0, 0); if (afd >= 0) { QPID_LOG(trace, "Accepting connection with optional SSL wrapper."); if (isSslStream(afd)) { QPID_LOG(trace, "Accepted SSL connection."); - return new SslSocket(new IOHandlePrivate(afd), prototype); + return new SslSocket(afd, prototype); } else { QPID_LOG(trace, "Accepted Plaintext connection."); - return new Socket(new IOHandlePrivate(afd)); + return new BSDSocket(afd); } } else if (errno == EAGAIN) { return 0; diff --git a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h index 1b5424cfeb..1efbbe4a88 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslSocket.h +++ b/qpid/cpp/src/qpid/sys/ssl/SslSocket.h @@ -23,7 +23,7 @@ */ #include "qpid/sys/IOHandle.h" -#include "qpid/sys/Socket.h" +#include "qpid/sys/posix/BSDSocket.h" #include #include @@ -37,7 +37,7 @@ class Duration; namespace ssl { -class SslSocket : public qpid::sys::Socket +class SslSocket : public qpid::sys::BSDSocket { public: /** Create a socket wrapper for descriptor. @@ -71,7 +71,7 @@ public: * Accept a connection from a socket that is already listening * and has an incoming connection */ - SslSocket* accept() const; + virtual Socket* accept() const; // TODO The following are raw operations, maybe they need better wrapping? int read(void *buf, size_t count) const; @@ -92,8 +92,8 @@ protected: */ mutable PRFileDesc* prototype; - SslSocket(IOHandlePrivate* ioph, PRFileDesc* model); - friend class SslMuxSocket; + SslSocket(int fd, PRFileDesc* model); + friend class SslMuxSocket; // Needed for this constructor }; class SslMuxSocket : public SslSocket diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 9fdf89c83b..e7e966519d 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -24,6 +24,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/windows/WinSocket.h" #include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Thread.h" @@ -51,8 +52,8 @@ namespace { * The function pointers for AcceptEx and ConnectEx need to be looked up * at run time. */ -const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { - SOCKET h = toSocketHandle(s); +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) { + SOCKET h = io.fd; GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; LPFN_ACCEPTEX fnAcceptEx; @@ -94,12 +95,14 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; + const SOCKET wSocket; const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), socket(s), + wSocket(IOHandle(s).fd), fnAcceptEx(lookUpAcceptEx(s)) { s.setNonblocking(); @@ -122,8 +125,8 @@ void AsynchAcceptor::restart(void) { this, socket); BOOL status; - status = fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), + status = fnAcceptEx(wSocket, + IOHandle(*result->newSocket).fd, result->addressBuffer, 0, AsynchAcceptResult::SOCKADDRMAXLEN, @@ -134,16 +137,30 @@ void AsynchAcceptor::restart(void) { } +Socket* createSameTypeSocket(const Socket& sock) { + SOCKET socket = IOHandle(sock).fd; + // Socket currently has no actual socket attached + if (socket == INVALID_SOCKET) + return new WinSocket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + return new WinSocket(s); +} + AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - const Socket& listener) + const Socket& lsocket) : callback(cb), acceptor(acceptor), - listener(toSocketHandle(listener)), - newSocket(listener.createSameTypeSocket()) { + listener(IOHandle(lsocket).fd), + newSocket(createSameTypeSocket(lsocket)) { } void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { - ::setsockopt (toSocketHandle(*newSocket), + ::setsockopt (IOHandle(*newSocket).fd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&listener, @@ -363,7 +380,7 @@ class CallbackHandle : public IOHandle { public: CallbackHandle(AsynchIoResult::Completer completeCb, AsynchIO::RequestCallback reqCb = 0) : - IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb)) + IOHandle(INVALID_SOCKET, completeCb, reqCb) {} }; @@ -516,7 +533,7 @@ void AsynchIO::startReading() { DWORD bytesReceived = 0, flags = 0; InterlockedIncrement(&opsInProgress); readInProgress = true; - int status = WSARecv(toSocketHandle(socket), + int status = WSARecv(IOHandle(socket).fd, const_cast(result->getWSABUF()), 1, &bytesReceived, &flags, @@ -614,7 +631,7 @@ void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { buff, buff->dataCount); DWORD bytesSent = 0; - int status = WSASend(toSocketHandle(socket), + int status = WSASend(IOHandle(socket).fd, const_cast(result->getWSABUF()), 1, &bytesSent, 0, diff --git a/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp index 250737cb99..19a1c44875 100755 --- a/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp +++ b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp @@ -19,24 +19,11 @@ * */ -#include "qpid/sys/IOHandle.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include namespace qpid { namespace sys { -SOCKET toFd(const IOHandlePrivate* h) -{ - return h->fd; -} - -IOHandle::IOHandle(IOHandlePrivate* h) : - impl(h) -{} - -IOHandle::~IOHandle() { - delete impl; -} }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h index 5943db5cc7..4529ad93ec 100755 --- a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h +++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -38,15 +38,14 @@ namespace sys { // completer from an I/O thread. If the callback mechanism is used, there // can be a RequestCallback set - this carries the callback object through // from AsynchIO::requestCallback() through to the I/O completion processing. -class IOHandlePrivate { - friend QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s); - static IOHandlePrivate* getImpl(const IOHandle& h); - +class IOHandle { public: - IOHandlePrivate(SOCKET f = INVALID_SOCKET, - windows::AsynchIoResult::Completer cb = 0, - AsynchIO::RequestCallback reqCallback = 0) : - fd(f), event(cb), cbRequest(reqCallback) + IOHandle(SOCKET f = INVALID_SOCKET, + windows::AsynchIoResult::Completer cb = 0, + AsynchIO::RequestCallback reqCallback = 0) : + fd(f), + event(cb), + cbRequest(reqCallback) {} SOCKET fd; @@ -54,8 +53,6 @@ public: AsynchIO::RequestCallback cbRequest; }; -QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s); - }} #endif /* _sys_windows_IoHandlePrivate_h */ diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp index c81cef87b0..ecb33c5517 100755 --- a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -22,7 +22,7 @@ #include "qpid/sys/Poller.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Dispatcher.h" - +#include "qpid/sys/IOHandle.h" #include "qpid/sys/windows/AsynchIoResult.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include "qpid/sys/windows/check.h" @@ -55,7 +55,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toSocketHandle(static_cast(h)), h.impl->event, h.impl->cbRequest)) + impl(new PollerHandlePrivate(h.fd, h.event, h.cbRequest)) {} PollerHandle::~PollerHandle() { diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp index bb637be0a6..7bbcd4de1b 100644 --- a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -57,8 +57,7 @@ private: PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, sys::PollableCondition& parent, const boost::shared_ptr& poller) - : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET, - boost::bind(&PollableConditionPrivate::dispatch, this, _1))), + : IOHandle(INVALID_SOCKET, boost::bind(&PollableConditionPrivate::dispatch, this, _1)), cb(cb), parent(parent), poller(poller), isSet(0) { } diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp deleted file mode 100644 index 0c74b3a725..0000000000 --- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp +++ /dev/null @@ -1,282 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Socket.h" - -#include "qpid/sys/SocketAddress.h" -#include "qpid/sys/windows/check.h" -#include "qpid/sys/windows/IoHandlePrivate.h" -#include "qpid/sys/SystemInfo.h" - -// Ensure we get all of winsock2.h -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif - -#include - -namespace qpid { -namespace sys { - -// Need to initialize WinSock. Ideally, this would be a singleton or embedded -// in some one-time initialization function. I tried boost singleton and could -// not get it to compile (and others located in google had the same problem). -// So, this simple static with an interlocked increment will do for known -// use cases at this time. Since this will only shut down winsock at process -// termination, there may be some problems with client programs that also -// expect to load and unload winsock, but we'll see... -// If someone does get an easy-to-use singleton sometime, converting to it -// may be preferable. - -namespace { - -static LONG volatile initialized = 0; - -class WinSockSetup { - // : public boost::details::pool::singleton_default { - -public: - WinSockSetup() { - LONG timesEntered = InterlockedIncrement(&initialized); - if (timesEntered > 1) - return; - err = 0; - WORD wVersionRequested; - WSADATA wsaData; - - /* Request WinSock 2.2 */ - wVersionRequested = MAKEWORD(2, 2); - err = WSAStartup(wVersionRequested, &wsaData); - } - - ~WinSockSetup() { - if (SystemInfo::threadSafeShutdown()) - WSACleanup(); - } - -public: - int error(void) const { return err; } - -protected: - DWORD err; -}; - -static WinSockSetup setup; - -std::string getName(SOCKET fd, bool local) -{ - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - - if (local) { - QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); - } else { - QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen)); - } - - return SocketAddress::asString(name, namelen); -} - -uint16_t getLocalPort(int fd) -{ - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - - QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); - - return SocketAddress::getPort(name); -} -} // namespace - -Socket::Socket() : - IOHandle(new IOHandlePrivate), - nonblocking(false), - nodelay(false) -{} - -Socket::Socket(IOHandlePrivate* h) : - IOHandle(h), - nonblocking(false), - nodelay(false) -{} - -void Socket::createSocket(const SocketAddress& sa) const -{ - SOCKET& socket = impl->fd; - if (socket != INVALID_SOCKET) Socket::close(); - - SOCKET s = ::socket (getAddrInfo(sa).ai_family, - getAddrInfo(sa).ai_socktype, - 0); - if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); - socket = s; - - try { - if (nonblocking) setNonblocking(); - if (nodelay) setTcpNoDelay(); - } catch (std::exception&) { - ::closesocket(s); - socket = INVALID_SOCKET; - throw; - } -} - -Socket* Socket::createSameTypeSocket() const { - SOCKET& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == INVALID_SOCKET) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); - return new Socket(new IOHandlePrivate(s)); -} - -void Socket::setNonblocking() const { - u_long nonblock = 1; - QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); -} - -void -Socket::connect(const SocketAddress& addr) const -{ - peername = addr.asString(false); - - createSocket(addr); - - const SOCKET& socket = impl->fd; - int err; - WSASetLastError(0); - if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) && - ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK)) - throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername)); -} - -void -Socket::close() const -{ - SOCKET& socket = impl->fd; - if (socket == INVALID_SOCKET) return; - QPID_WINSOCK_CHECK(closesocket(socket)); - socket = INVALID_SOCKET; -} - - -int Socket::write(const void *buf, size_t count) const -{ - const SOCKET& socket = impl->fd; - int sent = ::send(socket, (const char *)buf, count, 0); - if (sent == SOCKET_ERROR) - return -1; - return sent; -} - -int Socket::read(void *buf, size_t count) const -{ - const SOCKET& socket = impl->fd; - int received = ::recv(socket, (char *)buf, count, 0); - if (received == SOCKET_ERROR) - return -1; - return received; -} - -int Socket::listen(const SocketAddress& addr, int backlog) const -{ - createSocket(addr); - - const SOCKET& socket = impl->fd; - BOOL yes=1; - QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); - - if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError()))); - if (::listen(socket, backlog) == SOCKET_ERROR) - throw Exception(QPID_MSG("Can't listen on " <fd, 0, 0); - if (afd != INVALID_SOCKET) - return new Socket(new IOHandlePrivate(afd)); - else if (WSAGetLastError() == EAGAIN) - return 0; - else throw QPID_WINDOWS_ERROR(WSAGetLastError()); -} - -std::string Socket::getPeerAddress() const -{ - if (peername.empty()) { - peername = getName(impl->fd, false); - } - return peername; -} - -std::string Socket::getLocalAddress() const -{ - if (localname.empty()) { - localname = getName(impl->fd, true); - } - return localname; -} - -int Socket::getError() const -{ - int result; - socklen_t rSize = sizeof (result); - - QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); - return result; -} - -void Socket::setTcpNoDelay() const -{ - SOCKET& socket = impl->fd; - nodelay = true; - if (socket != INVALID_SOCKET) { - int flag = 1; - int result = setsockopt(impl->fd, - IPPROTO_TCP, - TCP_NODELAY, - (char *)&flag, - sizeof(flag)); - QPID_WINSOCK_CHECK(result); - } -} - -inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h) -{ - return h.impl; -} - -SOCKET toSocketHandle(const Socket& s) -{ - return IOHandlePrivate::getImpl(s)->fd; -} - -}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp b/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp new file mode 100644 index 0000000000..c1ac31de76 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp @@ -0,0 +1,261 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/windows/WinSocket.h" + +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/IoHandlePrivate.h" +#include "qpid/sys/SystemInfo.h" + +namespace qpid { +namespace sys { + +// Need to initialize WinSock. Ideally, this would be a singleton or embedded +// in some one-time initialization function. I tried boost singleton and could +// not get it to compile (and others located in google had the same problem). +// So, this simple static with an interlocked increment will do for known +// use cases at this time. Since this will only shut down winsock at process +// termination, there may be some problems with client programs that also +// expect to load and unload winsock, but we'll see... +// If someone does get an easy-to-use singleton sometime, converting to it +// may be preferable. + +namespace { + +static LONG volatile initialized = 0; + +class WinSockSetup { + // : public boost::details::pool::singleton_default { + +public: + WinSockSetup() { + LONG timesEntered = InterlockedIncrement(&initialized); + if (timesEntered > 1) + return; + err = 0; + WORD wVersionRequested; + WSADATA wsaData; + + /* Request WinSock 2.2 */ + wVersionRequested = MAKEWORD(2, 2); + err = WSAStartup(wVersionRequested, &wsaData); + } + + ~WinSockSetup() { + if (SystemInfo::threadSafeShutdown()) + WSACleanup(); + } + +public: + int error(void) const { return err; } + +protected: + DWORD err; +}; + +static WinSockSetup setup; + +std::string getName(SOCKET fd, bool local) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + if (local) { + QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); + } else { + QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen)); + } + + return SocketAddress::asString(name, namelen); +} + +uint16_t getLocalPort(int fd) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); + + return SocketAddress::getPort(name); +} +} // namespace + +WinSocket::WinSocket() : + handle(new IOHandle), + nonblocking(false), + nodelay(false) +{} + +Socket* createSocket() +{ + return new WinSocket; +} + +WinSocket::WinSocket(SOCKET fd) : + handle(new IOHandle(fd)), + nonblocking(false), + nodelay(false) +{} + +WinSocket::operator const IOHandle&() const +{ + return *handle; +} + +void WinSocket::createSocket(const SocketAddress& sa) const +{ + SOCKET& socket = handle->fd; + if (socket != INVALID_SOCKET) WinSocket::close(); + + SOCKET s = ::socket (getAddrInfo(sa).ai_family, + getAddrInfo(sa).ai_socktype, + 0); + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + socket = s; + + try { + if (nonblocking) setNonblocking(); + if (nodelay) setTcpNoDelay(); + } catch (std::exception&) { + ::closesocket(s); + socket = INVALID_SOCKET; + throw; + } +} + +void WinSocket::setNonblocking() const { + u_long nonblock = 1; + QPID_WINSOCK_CHECK(ioctlsocket(handle->fd, FIONBIO, &nonblock)); +} + +void +WinSocket::connect(const SocketAddress& addr) const +{ + peername = addr.asString(false); + + createSocket(addr); + + const SOCKET& socket = handle->fd; + int err; + WSASetLastError(0); + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) && + ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK)) + throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername)); +} + +void +WinSocket::close() const +{ + SOCKET& socket = handle->fd; + if (socket == INVALID_SOCKET) return; + QPID_WINSOCK_CHECK(closesocket(socket)); + socket = INVALID_SOCKET; +} + + +int WinSocket::write(const void *buf, size_t count) const +{ + const SOCKET& socket = handle->fd; + int sent = ::send(socket, (const char *)buf, count, 0); + if (sent == SOCKET_ERROR) + return -1; + return sent; +} + +int WinSocket::read(void *buf, size_t count) const +{ + const SOCKET& socket = handle->fd; + int received = ::recv(socket, (char *)buf, count, 0); + if (received == SOCKET_ERROR) + return -1; + return received; +} + +int WinSocket::listen(const SocketAddress& addr, int backlog) const +{ + createSocket(addr); + + const SOCKET& socket = handle->fd; + BOOL yes=1; + QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); + + if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError()))); + if (::listen(socket, backlog) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't listen on " <fd, 0, 0); + if (afd != INVALID_SOCKET) + return new WinSocket(afd); + else if (WSAGetLastError() == EAGAIN) + return 0; + else throw QPID_WINDOWS_ERROR(WSAGetLastError()); +} + +std::string WinSocket::getPeerAddress() const +{ + if (peername.empty()) { + peername = getName(handle->fd, false); + } + return peername; +} + +std::string WinSocket::getLocalAddress() const +{ + if (localname.empty()) { + localname = getName(handle->fd, true); + } + return localname; +} + +int WinSocket::getError() const +{ + int result; + socklen_t rSize = sizeof (result); + + QPID_WINSOCK_CHECK(::getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); + return result; +} + +void WinSocket::setTcpNoDelay() const +{ + SOCKET& socket = handle->fd; + nodelay = true; + if (socket != INVALID_SOCKET) { + int flag = 1; + int result = setsockopt(handle->fd, + IPPROTO_TCP, + TCP_NODELAY, + (char *)&flag, + sizeof(flag)); + QPID_WINSOCK_CHECK(result); + } +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/WinSocket.h b/qpid/cpp/src/qpid/sys/windows/WinSocket.h new file mode 100644 index 0000000000..17905a6133 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/WinSocket.h @@ -0,0 +1,114 @@ +#ifndef QPID_SYS_WINDOWS_BSDSOCKET_H +#define QPID_SYS_WINDOWS_BSDSOCKET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include + +#include + +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + +#include + +namespace qpid { +namespace sys { + +namespace windows { +Socket* createSameTypeSocket(const Socket&); +} + +class Duration; +class IOHandle; +class SocketAddress; + +class QPID_COMMON_CLASS_EXTERN WinSocket : public Socket +{ +public: + /** Create a socket wrapper for descriptor. */ + QPID_COMMON_EXTERN WinSocket(); + + QPID_COMMON_EXTERN operator const IOHandle&() const; + + /** Set socket non blocking */ + QPID_COMMON_EXTERN virtual void setNonblocking() const; + + QPID_COMMON_EXTERN virtual void setTcpNoDelay() const; + + QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const; + + QPID_COMMON_EXTERN virtual void close() const; + + /** Bind to a port and start listening. + *@return The bound port number + */ + QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const; + + /** + * Returns an address (host and port) for the remote end of the + * socket + */ + QPID_COMMON_EXTERN std::string getPeerAddress() const; + /** + * Returns an address (host and port) for the local end of the + * socket + */ + QPID_COMMON_EXTERN std::string getLocalAddress() const; + + /** + * Returns the error code stored in the socket. This may be used + * to determine the result of a non-blocking connect. + */ + QPID_COMMON_EXTERN int getError() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + QPID_COMMON_EXTERN virtual Socket* accept() const; + + // TODO The following are raw operations, maybe they need better wrapping? + QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const; + QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const; + +protected: + /** Create socket */ + void createSocket(const SocketAddress&) const; + + mutable boost::scoped_ptr handle; + mutable std::string localname; + mutable std::string peername; + mutable bool nonblocking; + mutable bool nodelay; + + /** Construct socket with existing handle */ + friend Socket* qpid::sys::windows::createSameTypeSocket(const Socket&); + WinSocket(SOCKET fd); +}; + +}} +#endif /*!QPID_SYS_WINDOWS_BSDSOCKET_H*/ diff --git a/qpid/cpp/src/tests/DispatcherTest.cpp b/qpid/cpp/src/tests/DispatcherTest.cpp index e1691db584..7312fe8d2e 100644 --- a/qpid/cpp/src/tests/DispatcherTest.cpp +++ b/qpid/cpp/src/tests/DispatcherTest.cpp @@ -20,7 +20,6 @@ */ #include "qpid/sys/Poller.h" -#include "qpid/sys/IOHandle.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/posix/PrivatePosix.h" @@ -147,8 +146,8 @@ int main(int /*argc*/, char** /*argv*/) for (int i = 0; i < 8; i++) testString += testString; - PosixIOHandle f0(sv[0]); - PosixIOHandle f1(sv[1]); + IOHandle f0(sv[0]); + IOHandle f1(sv[1]); rh = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0); wh = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); diff --git a/qpid/cpp/src/tests/PollerTest.cpp b/qpid/cpp/src/tests/PollerTest.cpp index 9fa5689c5f..5a1d02964c 100644 --- a/qpid/cpp/src/tests/PollerTest.cpp +++ b/qpid/cpp/src/tests/PollerTest.cpp @@ -23,7 +23,6 @@ * Use socketpair to test the poller */ -#include "qpid/sys/IOHandle.h" #include "qpid/sys/Poller.h" #include "qpid/sys/posix/PrivatePosix.h" @@ -106,8 +105,8 @@ int main(int /*argc*/, char** /*argv*/) auto_ptr poller(new Poller); - PosixIOHandle f0(sv[0]); - PosixIOHandle f1(sv[1]); + IOHandle f0(sv[0]); + IOHandle f1(sv[1]); PollerHandle h0(f0); PollerHandle h1(f1); @@ -225,8 +224,8 @@ int main(int /*argc*/, char** /*argv*/) auto_ptr poller1(new Poller); - PosixIOHandle f2(sv[0]); - PosixIOHandle f3(sv[1]); + IOHandle f2(sv[0]); + IOHandle f3(sv[1]); PollerHandle h2(f2); PollerHandle h3(f3); -- cgit v1.2.1