diff options
| author | Alan Conway <aconway@apache.org> | 2011-04-01 20:03:16 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-04-01 20:03:16 +0000 |
| commit | 6d7a0d5ae684f3521ebb2b505d79549f8e1f9eea (patch) | |
| tree | 6246817e66f906b63d44218ad1745c349049324d /qpid/cpp | |
| parent | 1b5c9ca18694061838f5e57cf7b8ca836cb35a46 (diff) | |
| download | qpid-python-6d7a0d5ae684f3521ebb2b505d79549f8e1f9eea.tar.gz | |
Merge branch 'trunk' into qpid-2920
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1087871 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/examples/old_api/Makefile.am | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/posix/Socket.cpp | 37 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 2 |
7 files changed, 35 insertions, 18 deletions
diff --git a/qpid/cpp/examples/old_api/Makefile.am b/qpid/cpp/examples/old_api/Makefile.am index 466eee22e1..04216ffa97 100644 --- a/qpid/cpp/examples/old_api/Makefile.am +++ b/qpid/cpp/examples/old_api/Makefile.am @@ -36,7 +36,7 @@ $(MAKEDIST): Makefile examplesdir=$(pkgdatadir)/examples/old_api dist_examples_DATA = $(MAKEDIST) -EXTRA_DIST = README.verify verify verify_all +EXTRA_DIST = README.verify verify verify_all CMakeLists.txt # For older versions of automake abs_top_srcdir = @abs_top_srcdir@ diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 820d97439c..8ede09fa79 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -744,7 +744,7 @@ libqpidclient_la_SOURCES = \ QPIDCLIENT_VERSION_INFO = 2:0:0 libqpidclient_la_LDFLAGS = -version-info $(QPIDCLIENT_VERSION_INFO) -libqpidtypes_la_libadd=-luuid +libqpidtypes_la_LIBADD= -luuid libqpidtypes_la_SOURCES= \ qpid/types/Exception.cpp \ qpid/types/Uuid.cpp \ diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index bd061ac214..48ba320e39 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -188,7 +188,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure msg->addToSyncList(shared_from_this(), store); } - msg->enqueueComplete(); // mark the message as enqueued if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -215,7 +214,6 @@ void Queue::requeue(const QueuedMessage& msg){ { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; - msg.payload->enqueueComplete(); // mark the message as enqueued messages->reinsert(msg); listeners.populate(copy); @@ -666,7 +664,9 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg } if ((msg->isPersistent() || msg->checkContentReleasable()) && store) { - msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete() + // when it considers the message stored. + msg->enqueueAsync(shared_from_this(), store); boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index b39ea3614b..a93a6332fd 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -246,7 +246,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { // If the message is bigger than the queue size, give up - if (m->contentSize() > getMaxSize()) { + if (getMaxSize() && m->contentSize() > getMaxSize()) { QPID_LOG(debug, "Message too large for ring queue " << name << " [" << *this << "] " << ": message size = " << m->contentSize() << " bytes" diff --git a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp index 454ce62495..3d868da64b 100644 --- a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp +++ b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp @@ -106,7 +106,7 @@ size_t CyrusSecurityLayer::encode(const char* buffer, size_t size) bool CyrusSecurityLayer::canEncode() { - return encrypted || codec->canEncode(); + return codec && (encrypted || codec->canEncode()); } void CyrusSecurityLayer::init(qpid::sys::Codec* c) diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 7b906f33e8..3449a753e3 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,9 +45,9 @@ namespace sys { namespace { std::string getName(int fd, bool local, bool includeService = false) { - ::sockaddr_storage name; // big enough for any socket address + ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); - + int result = -1; if (local) { result = ::getsockname(fd, (::sockaddr*)&name, &namelen); @@ -60,8 +60,8 @@ std::string getName(int fd, bool local, bool includeService = false) char servName[NI_MAXSERV]; char dispName[NI_MAXHOST]; if (includeService) { - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), - servName, sizeof(servName), + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), + servName, sizeof(servName), NI_NUMERICHOST | NI_NUMERICSERV) != 0) throw QPID_POSIX_ERROR(rc); return std::string(dispName) + ":" + std::string(servName); @@ -75,9 +75,9 @@ std::string getName(int fd, bool local, bool includeService = false) std::string getService(int fd, bool local) { - ::sockaddr_storage name; // big enough for any socket address + ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); - + int result = -1; if (local) { result = ::getsockname(fd, (::sockaddr*)&name, &namelen); @@ -88,8 +88,8 @@ std::string getService(int fd, bool local) QPID_POSIX_CHECK(result); char servName[NI_MAXSERV]; - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, - servName, sizeof(servName), + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, + servName, sizeof(servName), NI_NUMERICHOST | NI_NUMERICSERV) != 0) throw QPID_POSIX_ERROR(rc); return servName; @@ -172,6 +172,23 @@ void Socket::connect(const SocketAddress& addr) const (errno != EINPROGRESS)) { throw Exception(QPID_MSG(strError(errno) << ": " << connectname)); } + // When connecting to a port on the same host which no longer has + // a process associated with it, the OS occasionally chooses the + // remote port (which is unoccupied) as the port to bind the local + // end of the socket, resulting in a "circular" connection. + // + // This seems like something the OS should prevent but I have + // confirmed that sporadic hangs in + // cluster_tests.LongTests.test_failover on RHEL5 are caused by + // such a circular connection. + // + // Raise an error if we see such a connection, since we know there is + // no listener on the peer address. + // + if (getLocalAddress() == getPeerAddress()) { + close(); + throw Exception(QPID_MSG("Connection refused: " << connectname)); + } } void diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 39eec466f2..3b2253d7fc 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -509,7 +509,7 @@ class BrokerTest(TestCase): r.close() self.assertEqual(expect_contents, actual_contents) -def join(thread, timeout=1): +def join(thread, timeout=10): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) |
