summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-04-01 20:03:16 +0000
committerAlan Conway <aconway@apache.org>2011-04-01 20:03:16 +0000
commit6d7a0d5ae684f3521ebb2b505d79549f8e1f9eea (patch)
tree6246817e66f906b63d44218ad1745c349049324d /qpid/cpp
parent1b5c9ca18694061838f5e57cf7b8ca836cb35a46 (diff)
downloadqpid-python-6d7a0d5ae684f3521ebb2b505d79549f8e1f9eea.tar.gz
Merge branch 'trunk' into qpid-2920
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1087871 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/examples/old_api/Makefile.am2
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp37
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
7 files changed, 35 insertions, 18 deletions
diff --git a/qpid/cpp/examples/old_api/Makefile.am b/qpid/cpp/examples/old_api/Makefile.am
index 466eee22e1..04216ffa97 100644
--- a/qpid/cpp/examples/old_api/Makefile.am
+++ b/qpid/cpp/examples/old_api/Makefile.am
@@ -36,7 +36,7 @@ $(MAKEDIST): Makefile
examplesdir=$(pkgdatadir)/examples/old_api
dist_examples_DATA = $(MAKEDIST)
-EXTRA_DIST = README.verify verify verify_all
+EXTRA_DIST = README.verify verify verify_all CMakeLists.txt
# For older versions of automake
abs_top_srcdir = @abs_top_srcdir@
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 820d97439c..8ede09fa79 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -744,7 +744,7 @@ libqpidclient_la_SOURCES = \
QPIDCLIENT_VERSION_INFO = 2:0:0
libqpidclient_la_LDFLAGS = -version-info $(QPIDCLIENT_VERSION_INFO)
-libqpidtypes_la_libadd=-luuid
+libqpidtypes_la_LIBADD= -luuid
libqpidtypes_la_SOURCES= \
qpid/types/Exception.cpp \
qpid/types/Uuid.cpp \
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index bd061ac214..48ba320e39 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -188,7 +188,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
msg->addToSyncList(shared_from_this(), store);
}
- msg->enqueueComplete(); // mark the message as enqueued
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -215,7 +214,6 @@ void Queue::requeue(const QueuedMessage& msg){
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
- msg.payload->enqueueComplete(); // mark the message as enqueued
messages->reinsert(msg);
listeners.populate(copy);
@@ -666,7 +664,9 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
}
if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
- msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
+ // when it considers the message stored.
+ msg->enqueueAsync(shared_from_this(), store);
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index b39ea3614b..a93a6332fd 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -246,7 +246,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
// If the message is bigger than the queue size, give up
- if (m->contentSize() > getMaxSize()) {
+ if (getMaxSize() && m->contentSize() > getMaxSize()) {
QPID_LOG(debug, "Message too large for ring queue " << name
<< " [" << *this << "] "
<< ": message size = " << m->contentSize() << " bytes"
diff --git a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
index 454ce62495..3d868da64b 100644
--- a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
+++ b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
@@ -106,7 +106,7 @@ size_t CyrusSecurityLayer::encode(const char* buffer, size_t size)
bool CyrusSecurityLayer::canEncode()
{
- return encrypted || codec->canEncode();
+ return codec && (encrypted || codec->canEncode());
}
void CyrusSecurityLayer::init(qpid::sys::Codec* c)
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 7b906f33e8..3449a753e3 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,9 +45,9 @@ namespace sys {
namespace {
std::string getName(int fd, bool local, bool includeService = false)
{
- ::sockaddr_storage name; // big enough for any socket address
+ ::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
-
+
int result = -1;
if (local) {
result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
@@ -60,8 +60,8 @@ std::string getName(int fd, bool local, bool includeService = false)
char servName[NI_MAXSERV];
char dispName[NI_MAXHOST];
if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
+ servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
throw QPID_POSIX_ERROR(rc);
return std::string(dispName) + ":" + std::string(servName);
@@ -75,9 +75,9 @@ std::string getName(int fd, bool local, bool includeService = false)
std::string getService(int fd, bool local)
{
- ::sockaddr_storage name; // big enough for any socket address
+ ::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
-
+
int result = -1;
if (local) {
result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
@@ -88,8 +88,8 @@ std::string getService(int fd, bool local)
QPID_POSIX_CHECK(result);
char servName[NI_MAXSERV];
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
- servName, sizeof(servName),
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
+ servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
throw QPID_POSIX_ERROR(rc);
return servName;
@@ -172,6 +172,23 @@ void Socket::connect(const SocketAddress& addr) const
(errno != EINPROGRESS)) {
throw Exception(QPID_MSG(strError(errno) << ": " << connectname));
}
+ // When connecting to a port on the same host which no longer has
+ // a process associated with it, the OS occasionally chooses the
+ // remote port (which is unoccupied) as the port to bind the local
+ // end of the socket, resulting in a "circular" connection.
+ //
+ // This seems like something the OS should prevent but I have
+ // confirmed that sporadic hangs in
+ // cluster_tests.LongTests.test_failover on RHEL5 are caused by
+ // such a circular connection.
+ //
+ // Raise an error if we see such a connection, since we know there is
+ // no listener on the peer address.
+ //
+ if (getLocalAddress() == getPeerAddress()) {
+ close();
+ throw Exception(QPID_MSG("Connection refused: " << connectname));
+ }
}
void
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 39eec466f2..3b2253d7fc 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -509,7 +509,7 @@ class BrokerTest(TestCase):
r.close()
self.assertEqual(expect_contents, actual_contents)
-def join(thread, timeout=1):
+def join(thread, timeout=10):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)