From 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 28 Feb 2013 16:14:30 +0000 Subject: Update from trunk r1375509 through r1450773 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/posix/AsynchIO.cpp | 67 ++++--- cpp/src/qpid/sys/posix/BSDSocket.cpp | 264 +++++++++++++++++++++++++++ cpp/src/qpid/sys/posix/BSDSocket.h | 113 ++++++++++++ cpp/src/qpid/sys/posix/FileSysDir.cpp | 26 +++ cpp/src/qpid/sys/posix/IOHandle.cpp | 15 -- cpp/src/qpid/sys/posix/PollableCondition.cpp | 7 +- cpp/src/qpid/sys/posix/PosixPoller.cpp | 8 +- cpp/src/qpid/sys/posix/Socket.cpp | 263 -------------------------- cpp/src/qpid/sys/posix/SocketAddress.cpp | 5 + cpp/src/qpid/sys/posix/SystemInfo.cpp | 133 +++++++------- 10 files changed, 510 insertions(+), 391 deletions(-) create mode 100644 cpp/src/qpid/sys/posix/BSDSocket.cpp create mode 100644 cpp/src/qpid/sys/posix/BSDSocket.h delete mode 100644 cpp/src/qpid/sys/posix/Socket.cpp (limited to 'cpp/src/qpid/sys/posix') diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 31355627cd..353a55f50c 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -143,6 +143,7 @@ class AsynchConnector : public qpid::sys::AsynchConnector, private: void connComplete(DispatchHandle& handle); + void requestedCall(RequestCallback rCb); private: ConnectedCallback connCallback; @@ -158,6 +159,7 @@ public: FailedCallback failCb); void start(Poller::shared_ptr poller); void stop(); + void requestCallback(RequestCallback rCb); }; AsynchConnector::AsynchConnector(const Socket& s, @@ -191,11 +193,30 @@ void AsynchConnector::stop() stopWatch(); } +void AsynchConnector::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchConnector::requestedCall, this, callback)); +} + +void AsynchConnector::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + void AsynchConnector::connComplete(DispatchHandle& h) { int errCode = socket.getError(); if (errCode == 0) { h.stopWatch(); + try { + socket.finishConnect(sa); + } catch (const std::exception& e) { + failCallback(socket, 0, e.what()); + DispatchHandle::doDelete(); + return; + } connCallback(socket); } else { // Retry while we cause an immediate exception @@ -247,10 +268,9 @@ public: virtual void notifyPendingWrite(); virtual void queueWriteClose(); virtual bool writeQueueEmpty(); - virtual void startReading(); - virtual void stopReading(); virtual void requestCallback(RequestCallback); virtual BufferBase* getQueuedBuffer(); + virtual SecuritySettings getSecuritySettings(); private: ~AsynchIO(); @@ -282,13 +302,6 @@ private: * thread processing this handle. */ volatile bool writePending; - /** - * This records whether we've been reading is flow controlled: - * it's safe as a simple boolean as the only way to be stopped - * is in calls only allowed in the callback context, the only calls - * checking it are also in calls only allowed in callback context. - */ - volatile bool readingStopped; }; AsynchIO::AsynchIO(const Socket& s, @@ -307,8 +320,7 @@ AsynchIO::AsynchIO(const Socket& s, idleCallback(iCb), socket(s), queuedClose(false), - writePending(false), - readingStopped(false) { + writePending(false) { s.setNonblocking(); } @@ -344,7 +356,7 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_back(buff); - if (queueWasEmpty && !readingStopped) + if (queueWasEmpty) DispatchHandle::rewatchRead(); } @@ -354,7 +366,7 @@ void AsynchIO::unread(BufferBase* buff) { bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_front(buff); - if (queueWasEmpty && !readingStopped) + if (queueWasEmpty) DispatchHandle::rewatchRead(); } @@ -386,17 +398,6 @@ bool AsynchIO::writeQueueEmpty() { return writeQueue.empty(); } -// This can happen outside the callback context -void AsynchIO::startReading() { - readingStopped = false; - DispatchHandle::rewatchRead(); -} - -void AsynchIO::stopReading() { - readingStopped = true; - DispatchHandle::unwatchRead(); -} - void AsynchIO::requestCallback(RequestCallback callback) { // TODO creating a function object every time isn't all that // efficient - if this becomes heavily used do something better (what?) @@ -429,11 +430,6 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { * to put it in and reading is not stopped by flow control. */ void AsynchIO::readable(DispatchHandle& h) { - if (readingStopped) { - // We have been flow controlled. - QPID_PROBE1(asynchio_read_flowcontrolled, &h); - return; - } AbsTime readStartTime = AbsTime::now(); size_t total = 0; int readCalls = 0; @@ -455,12 +451,6 @@ void AsynchIO::readable(DispatchHandle& h) { total += rc; readCallback(*this, buff); - if (readingStopped) { - // We have been flow controlled. - QPID_PROBE4(asynchio_read_finished_flowcontrolled, &h, duration, total, readCalls); - break; - } - if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading QPID_PROBE4(asynchio_read_finished_done, &h, duration, total, readCalls); @@ -626,6 +616,13 @@ void AsynchIO::close(DispatchHandle& h) { } } +SecuritySettings AsynchIO::getSecuritySettings() { + SecuritySettings settings; + settings.ssf = socket.getKeyLen(); + settings.authid = socket.getClientAuthId(); + return settings; +} + } // namespace posix AsynchAcceptor* AsynchAcceptor::create(const Socket& s, diff --git a/cpp/src/qpid/sys/posix/BSDSocket.cpp b/cpp/src/qpid/sys/posix/BSDSocket.cpp new file mode 100644 index 0000000000..7c31b13ae9 --- /dev/null +++ b/cpp/src/qpid/sys/posix/BSDSocket.cpp @@ -0,0 +1,264 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/posix/BSDSocket.h" + +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace qpid { +namespace sys { + +namespace { +std::string getName(int fd, bool local) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + if (local) { + QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + } else { + QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) ); + } + + return SocketAddress::asString(name, namelen); +} + +uint16_t getLocalPort(int fd) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + + return SocketAddress::getPort(name); +} +} + +BSDSocket::BSDSocket() : + fd(-1), + handle(new IOHandle), + nonblocking(false), + nodelay(false) +{} + +Socket* createSocket() +{ + return new BSDSocket; +} + +BSDSocket::BSDSocket(int fd0) : + fd(fd0), + handle(new IOHandle(fd)), + nonblocking(false), + nodelay(false) +{} + +BSDSocket::~BSDSocket() +{} + +BSDSocket::operator const IOHandle&() const +{ + return *handle; +} + +void BSDSocket::createSocket(const SocketAddress& sa) const +{ + int& socket = fd; + if (socket != -1) BSDSocket::close(); + int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0); + if (s < 0) throw QPID_POSIX_ERROR(errno); + socket = s; + *handle = IOHandle(s); + + try { + if (nonblocking) setNonblocking(); + if (nodelay) setTcpNoDelay(); + if (getAddrInfo(sa).ai_family == AF_INET6) { + int flag = 1; + int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + } + } catch (std::exception&) { + ::close(s); + socket = -1; + *handle = IOHandle(); + throw; + } +} + +void BSDSocket::setNonblocking() const { + int& socket = fd; + nonblocking = true; + if (socket != -1) { + QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK)); + } +} + +void BSDSocket::setTcpNoDelay() const +{ + int& socket = fd; + nodelay = true; + if (socket != -1) { + int flag = 1; + int result = ::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + } +} + +void BSDSocket::connect(const SocketAddress& addr) const +{ + // The display name for an outbound connection needs to be the name that was specified + // for the address rather than a resolved IP address as we don't know which of + // the IP addresses is actually the one that will be connected to. + peername = addr.asString(false); + + // However the string we compare with the local port must be numeric or it might not + // match when it should as getLocalAddress() will always be numeric + std::string connectname = addr.asString(); + + createSocket(addr); + + const int& socket = fd; + // TODO the correct thing to do here is loop on failure until you've used all the returned addresses + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && + (errno != EINPROGRESS)) { + throw Exception(QPID_MSG(strError(errno) << ": " << peername)); + } + // When connecting to a port on the same host which no longer has + // a process associated with it, the OS occasionally chooses the + // remote port (which is unoccupied) as the port to bind the local + // end of the socket, resulting in a "circular" connection. + // + // Raise an error if we see such a connection, since we know there is + // no listener on the peer address. + // + if (getLocalAddress() == connectname) { + close(); + throw Exception(QPID_MSG("Connection refused: " << peername)); + } +} + +void BSDSocket::finishConnect(const SocketAddress&) const +{ +} + +void +BSDSocket::close() const +{ + int& socket = fd; + if (socket == -1) return; + if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); + socket = -1; + *handle = IOHandle(); +} + +int BSDSocket::listen(const SocketAddress& sa, int backlog) const +{ + createSocket(sa); + + const int& socket = fd; + int yes=1; + QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); + + if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) + throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); + if (::listen(socket, backlog) < 0) + throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); + + return getLocalPort(socket); +} + +Socket* BSDSocket::accept() const +{ + int afd = ::accept(fd, 0, 0); + if ( afd >= 0) { + BSDSocket* s = new BSDSocket(afd); + s->localname = localname; + return s; + } + else if (errno == EAGAIN) + return 0; + else throw QPID_POSIX_ERROR(errno); +} + +int BSDSocket::read(void *buf, size_t count) const +{ + return ::read(fd, buf, count); +} + +int BSDSocket::write(const void *buf, size_t count) const +{ + return ::write(fd, buf, count); +} + +std::string BSDSocket::getPeerAddress() const +{ + if (peername.empty()) { + peername = getName(fd, false); + } + return peername; +} + +std::string BSDSocket::getLocalAddress() const +{ + if (localname.empty()) { + localname = getName(fd, true); + } + return localname; +} + +int BSDSocket::getError() const +{ + int result; + socklen_t rSize = sizeof (result); + + if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0) + throw QPID_POSIX_ERROR(errno); + + return result; +} + +int BSDSocket::getKeyLen() const +{ + return 0; +} + +std::string BSDSocket::getClientAuthId() const +{ + return std::string(); +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/BSDSocket.h b/cpp/src/qpid/sys/posix/BSDSocket.h new file mode 100644 index 0000000000..862d36c1b9 --- /dev/null +++ b/cpp/src/qpid/sys/posix/BSDSocket.h @@ -0,0 +1,113 @@ +#ifndef QPID_SYS_BSDSOCKET_H +#define QPID_SYS_BSDSOCKET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include + +#include + +namespace qpid { +namespace sys { + +class Duration; +class IOHandle; +class SocketAddress; + +namespace ssl { +class SslMuxSocket; +} + +class QPID_COMMON_CLASS_EXTERN BSDSocket : public Socket +{ +public: + /** Create a socket wrapper for descriptor. */ + QPID_COMMON_EXTERN BSDSocket(); + QPID_COMMON_EXTERN ~BSDSocket(); + + QPID_COMMON_EXTERN operator const IOHandle&() const; + + /** Set socket non blocking */ + QPID_COMMON_EXTERN virtual void setNonblocking() const; + + QPID_COMMON_EXTERN virtual void setTcpNoDelay() const; + + QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const; + QPID_COMMON_EXTERN virtual void finishConnect(const SocketAddress&) const; + + QPID_COMMON_EXTERN virtual void close() const; + + /** Bind to a port and start listening. + *@return The bound port number + */ + QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const; + + /** + * Returns an address (host and port) for the remote end of the + * socket + */ + QPID_COMMON_EXTERN std::string getPeerAddress() const; + /** + * Returns an address (host and port) for the local end of the + * socket + */ + QPID_COMMON_EXTERN std::string getLocalAddress() const; + + /** + * Returns the error code stored in the socket. This may be used + * to determine the result of a non-blocking connect. + */ + QPID_COMMON_EXTERN int getError() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + QPID_COMMON_EXTERN virtual Socket* accept() const; + + // TODO The following are raw operations, maybe they need better wrapping? + QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const; + QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const; + + QPID_COMMON_EXTERN int getKeyLen() const; + QPID_COMMON_EXTERN std::string getClientAuthId() const; + +protected: + /** Create socket */ + void createSocket(const SocketAddress&) const; + + mutable int fd; + mutable boost::scoped_ptr handle; + mutable std::string localname; + mutable std::string peername; + mutable bool nonblocking; + mutable bool nodelay; + + /** Construct socket with existing handle */ + BSDSocket(int fd); + friend class qpid::sys::ssl::SslMuxSocket; // Needed for this constructor +}; + +}} +#endif /*!QPID_SYS_BSDSOCKET_H*/ diff --git a/cpp/src/qpid/sys/posix/FileSysDir.cpp b/cpp/src/qpid/sys/posix/FileSysDir.cpp index 22dc487e74..cec580164d 100755 --- a/cpp/src/qpid/sys/posix/FileSysDir.cpp +++ b/cpp/src/qpid/sys/posix/FileSysDir.cpp @@ -18,6 +18,7 @@ #include "qpid/sys/FileSysDir.h" #include "qpid/sys/StrError.h" +#include "qpid/log/Statement.h" #include "qpid/Exception.h" #include @@ -25,6 +26,8 @@ #include #include #include +#include +#include namespace qpid { namespace sys { @@ -51,4 +54,27 @@ void FileSysDir::mkdir(void) throw Exception ("Can't create directory: " + dirPath); } +void FileSysDir::forEachFile(Callback cb) const { + + ::dirent** namelist; + + int n = scandir(dirPath.c_str(), &namelist, 0, alphasort); + if (n == -1) throw Exception (strError(errno) + ": Can't scan directory: " + dirPath); + + for (int i = 0; id_name; + // Filter out non files/stat problems etc. + struct ::stat s; + // Can't throw here without leaking memory, so just do nothing with + // entries for which stat() fails. + if (!::stat(fullpath.c_str(), &s)) { + if (S_ISREG(s.st_mode)) { + cb(fullpath); + } + } + ::free(namelist[i]); + } + ::free(namelist); +} + }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp index 9c049ee1de..d3f502a63c 100644 --- a/cpp/src/qpid/sys/posix/IOHandle.cpp +++ b/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -19,26 +19,11 @@ * */ -#include "qpid/sys/IOHandle.h" - #include "qpid/sys/posix/PrivatePosix.h" namespace qpid { namespace sys { -int toFd(const IOHandlePrivate* h) -{ - return h->fd; -} - NullIOHandle DummyIOHandle; -IOHandle::IOHandle(IOHandlePrivate* h) : - impl(h) -{} - -IOHandle::~IOHandle() { - delete impl; -} - }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/PollableCondition.cpp b/cpp/src/qpid/sys/posix/PollableCondition.cpp index abff8a5be8..aa129faf20 100644 --- a/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -21,7 +21,6 @@ #include "qpid/sys/PollableCondition.h" #include "qpid/sys/DispatchHandle.h" -#include "qpid/sys/IOHandle.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/Exception.h" @@ -58,14 +57,14 @@ PollableConditionPrivate::PollableConditionPrivate( const sys::PollableCondition::Callback& cb, sys::PollableCondition& parent, const boost::shared_ptr& poller -) : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent) +) : cb(cb), parent(parent) { int fds[2]; if (::pipe(fds) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); - impl->fd = fds[0]; + fd = fds[0]; writeFd = fds[1]; - if (::fcntl(impl->fd, F_SETFL, O_NONBLOCK) == -1) + if (::fcntl(fd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); diff --git a/cpp/src/qpid/sys/posix/PosixPoller.cpp b/cpp/src/qpid/sys/posix/PosixPoller.cpp index eb0c3384d1..ae839b2e20 100644 --- a/cpp/src/qpid/sys/posix/PosixPoller.cpp +++ b/cpp/src/qpid/sys/posix/PosixPoller.cpp @@ -88,12 +88,12 @@ class PollerHandlePrivate { }; short events; - const IOHandlePrivate* ioHandle; + const IOHandle* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : + PollerHandlePrivate(const IOHandle* h, PollerHandle* p) : events(0), ioHandle(h), pollerHandle(p), @@ -101,7 +101,7 @@ class PollerHandlePrivate { } int fd() const { - return toFd(ioHandle); + return ioHandle->fd; } bool isActive() const { @@ -162,7 +162,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(h.impl, this)) + impl(new PollerHandlePrivate(&h, this)) {} PollerHandle::~PollerHandle() { diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp deleted file mode 100644 index 77ae1af60c..0000000000 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ /dev/null @@ -1,263 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Socket.h" - -#include "qpid/sys/SocketAddress.h" -#include "qpid/sys/posix/check.h" -#include "qpid/sys/posix/PrivatePosix.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace qpid { -namespace sys { - -namespace { -std::string getName(int fd, bool local) -{ - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - - if (local) { - QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); - } else { - QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) ); - } - - return SocketAddress::asString(name, namelen); -} - -uint16_t getLocalPort(int fd) -{ - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - - QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); - - return SocketAddress::getPort(name); -} -} - -Socket::Socket() : - IOHandle(new IOHandlePrivate), - nonblocking(false), - nodelay(false) -{} - -Socket::Socket(IOHandlePrivate* h) : - IOHandle(h), - nonblocking(false), - nodelay(false) -{} - -void Socket::createSocket(const SocketAddress& sa) const -{ - int& socket = impl->fd; - if (socket != -1) Socket::close(); - int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0); - if (s < 0) throw QPID_POSIX_ERROR(errno); - socket = s; - - try { - if (nonblocking) setNonblocking(); - if (nodelay) setTcpNoDelay(); - if (getAddrInfo(sa).ai_family == AF_INET6) { - int flag = 1; - int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); - QPID_POSIX_CHECK(result); - } - } catch (std::exception&) { - ::close(s); - socket = -1; - throw; - } -} - -Socket* Socket::createSameTypeSocket() const { - int& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == -1) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s < 0) throw QPID_POSIX_ERROR(errno); - return new Socket(new IOHandlePrivate(s)); -} - -void Socket::setNonblocking() const { - int& socket = impl->fd; - nonblocking = true; - if (socket != -1) { - QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK)); - } -} - -void Socket::setTcpNoDelay() const -{ - int& socket = impl->fd; - nodelay = true; - if (socket != -1) { - int flag = 1; - int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); - QPID_POSIX_CHECK(result); - } -} - -void Socket::connect(const std::string& host, const std::string& port) const -{ - SocketAddress sa(host, port); - connect(sa); -} - -void Socket::connect(const SocketAddress& addr) const -{ - // The display name for an outbound connection needs to be the name that was specified - // for the address rather than a resolved IP address as we don't know which of - // the IP addresses is actually the one that will be connected to. - peername = addr.asString(false); - - // However the string we compare with the local port must be numeric or it might not - // match when it should as getLocalAddress() will always be numeric - std::string connectname = addr.asString(); - - createSocket(addr); - - const int& socket = impl->fd; - // TODO the correct thing to do here is loop on failure until you've used all the returned addresses - if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && - (errno != EINPROGRESS)) { - throw Exception(QPID_MSG(strError(errno) << ": " << peername)); - } - // When connecting to a port on the same host which no longer has - // a process associated with it, the OS occasionally chooses the - // remote port (which is unoccupied) as the port to bind the local - // end of the socket, resulting in a "circular" connection. - // - // This seems like something the OS should prevent but I have - // confirmed that sporadic hangs in - // cluster_tests.LongTests.test_failover on RHEL5 are caused by - // such a circular connection. - // - // Raise an error if we see such a connection, since we know there is - // no listener on the peer address. - // - if (getLocalAddress() == connectname) { - close(); - throw Exception(QPID_MSG("Connection refused: " << peername)); - } -} - -void -Socket::close() const -{ - int& socket = impl->fd; - if (socket == -1) return; - if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); - socket = -1; -} - -int Socket::listen(const std::string& host, const std::string& port, int backlog) const -{ - SocketAddress sa(host, port); - return listen(sa, backlog); -} - -int Socket::listen(const SocketAddress& sa, int backlog) const -{ - createSocket(sa); - - const int& socket = impl->fd; - int yes=1; - QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); - - if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) - throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); - if (::listen(socket, backlog) < 0) - throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); - - return getLocalPort(socket); -} - -Socket* Socket::accept() const -{ - int afd = ::accept(impl->fd, 0, 0); - if ( afd >= 0) { - Socket* s = new Socket(new IOHandlePrivate(afd)); - s->localname = localname; - return s; - } - else if (errno == EAGAIN) - return 0; - else throw QPID_POSIX_ERROR(errno); -} - -int Socket::read(void *buf, size_t count) const -{ - return ::read(impl->fd, buf, count); -} - -int Socket::write(const void *buf, size_t count) const -{ - return ::write(impl->fd, buf, count); -} - -std::string Socket::getPeerAddress() const -{ - if (peername.empty()) { - peername = getName(impl->fd, false); - } - return peername; -} - -std::string Socket::getLocalAddress() const -{ - if (localname.empty()) { - localname = getName(impl->fd, true); - } - return localname; -} - -int Socket::getError() const -{ - int result; - socklen_t rSize = sizeof (result); - - if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0) - throw QPID_POSIX_ERROR(errno); - - return result; -} - -}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/SocketAddress.cpp b/cpp/src/qpid/sys/posix/SocketAddress.cpp index 344bd28669..cd23442226 100644 --- a/cpp/src/qpid/sys/posix/SocketAddress.cpp +++ b/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -102,6 +102,11 @@ std::string SocketAddress::asString(bool numeric) const return asString(ai.ai_addr, ai.ai_addrlen); } +std::string SocketAddress::getHost() const +{ + return host; +} + bool SocketAddress::nextAddress() { bool r = currentAddrInfo->ai_next != 0; if (r) diff --git a/cpp/src/qpid/sys/posix/SystemInfo.cpp b/cpp/src/qpid/sys/posix/SystemInfo.cpp index cfd2c64aee..ea7f521f2b 100755 --- a/cpp/src/qpid/sys/posix/SystemInfo.cpp +++ b/cpp/src/qpid/sys/posix/SystemInfo.cpp @@ -21,7 +21,6 @@ #include "qpid/log/Statement.h" #include "qpid/sys/SystemInfo.h" #include "qpid/sys/posix/check.h" -#include #include #include #include @@ -33,6 +32,7 @@ #include #include #include +#include #include #include @@ -77,84 +77,70 @@ inline bool isLoopback(const ::sockaddr* addr) { } } -void SystemInfo::getLocalIpAddresses (uint16_t port, - std::vector
&addrList) { - ::ifaddrs* ifaddr = 0; - QPID_POSIX_CHECK(::getifaddrs(&ifaddr)); - for (::ifaddrs* ifap = ifaddr; ifap != 0; ifap = ifap->ifa_next) { - if (ifap->ifa_addr == 0) continue; - if (isLoopback(ifap->ifa_addr)) continue; - int family = ifap->ifa_addr->sa_family; - switch (family) { - case AF_INET6: { - // Ignore link local addresses as: - // * The scope id is illegal in URL syntax - // * Clients won't be able to use a link local address - // without adding their own (potentially different) scope id - sockaddr_in6* sa6 = (sockaddr_in6*)((void*)ifap->ifa_addr); - if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break; - // Fallthrough - } - case AF_INET: { - char dispName[NI_MAXHOST]; - int rc = ::getnameinfo( - ifap->ifa_addr, - (family == AF_INET) - ? sizeof(struct sockaddr_in) - : sizeof(struct sockaddr_in6), - dispName, sizeof(dispName), - 0, 0, NI_NUMERICHOST); - if (rc != 0) { - throw QPID_POSIX_ERROR(rc); - } - string addr(dispName); - addrList.push_back(Address(TCP, addr, port)); - break; - } - default: - continue; +namespace { + inline socklen_t sa_len(::sockaddr* sa) + { + switch (sa->sa_family) { + case AF_INET: + return sizeof(struct sockaddr_in); + case AF_INET6: + return sizeof(struct sockaddr_in6); + default: + return sizeof(struct sockaddr_storage); } } - ::freeifaddrs(ifaddr); - if (addrList.empty()) { - addrList.push_back(Address(TCP, LOOPBACK, port)); + inline bool isInetOrInet6(::sockaddr* sa) { + switch (sa->sa_family) { + case AF_INET: + case AF_INET6: + return true; + default: + return false; + } + } + typedef std::map > InterfaceInfo; + std::map > cachedInterfaces; + + void cacheInterfaceInfo() { + // Get interface info + ::ifaddrs* interfaceInfo; + QPID_POSIX_CHECK( ::getifaddrs(&interfaceInfo) ); + + char name[NI_MAXHOST]; + for (::ifaddrs* info = interfaceInfo; info != 0; info = info->ifa_next) { + + // Only use IPv4/IPv6 interfaces + if (!isInetOrInet6(info->ifa_addr)) continue; + + int rc=::getnameinfo(info->ifa_addr, sa_len(info->ifa_addr), + name, sizeof(name), 0, 0, + NI_NUMERICHOST); + if (rc >= 0) { + std::string address(name); + cachedInterfaces[info->ifa_name].push_back(address); + } else { + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + } + } + ::freeifaddrs(interfaceInfo); } } -namespace { -struct AddrInfo { - struct addrinfo* ptr; - AddrInfo(const std::string& host) : ptr(0) { - ::addrinfo hints; - ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 - if (::getaddrinfo(host.c_str(), NULL, &hints, &ptr) != 0) - ptr = 0; - } - ~AddrInfo() { if (ptr) ::freeaddrinfo(ptr); } -}; +bool SystemInfo::getInterfaceAddresses(const std::string& interface, std::vector& addresses) { + if ( cachedInterfaces.empty() ) cacheInterfaceInfo(); + InterfaceInfo::iterator i = cachedInterfaces.find(interface); + if ( i==cachedInterfaces.end() ) return false; + std::copy(i->second.begin(), i->second.end(), std::back_inserter(addresses)); + return true; } -bool SystemInfo::isLocalHost(const std::string& host) { - std::vector
myAddrs; - getLocalIpAddresses(0, myAddrs); - std::set localHosts; - for (std::vector
::const_iterator i = myAddrs.begin(); i != myAddrs.end(); ++i) - localHosts.insert(i->host); - // Resolve host - AddrInfo ai(host); - if (!ai.ptr) return false; - for (struct addrinfo *res = ai.ptr; res != NULL; res = res->ai_next) { - if (isLoopback(res->ai_addr)) return true; - // Get string form of IP addr - char addr[NI_MAXHOST] = ""; - int error = ::getnameinfo(res->ai_addr, res->ai_addrlen, addr, NI_MAXHOST, NULL, 0, - NI_NUMERICHOST | NI_NUMERICSERV); - if (error) return false; - if (localHosts.find(addr) != localHosts.end()) return true; +void SystemInfo::getInterfaceNames(std::vector& names ) { + if ( cachedInterfaces.empty() ) cacheInterfaceInfo(); + + for (InterfaceInfo::const_iterator i = cachedInterfaces.begin(); i!=cachedInterfaces.end(); ++i) { + names.push_back(i->first); } - return false; } void SystemInfo::getSystemId (std::string &osName, @@ -205,4 +191,11 @@ string SystemInfo::getProcessName() return value; } +// Always true. Only Windows has exception cases. +bool SystemInfo::threadSafeShutdown() +{ + return true; +} + + }} // namespace qpid::sys -- cgit v1.2.1