summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h27
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.cpp15
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h18
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py13
7 files changed, 55 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index b4d50d1652..af4ae12177 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -136,7 +136,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
- queueLimits(logPrefix)
+ queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
{
// Note that at this point, we are still rejecting client connections.
// So we are safe from client interference while we set up the primary.
diff --git a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
index a2322f1545..d614a48099 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
@@ -22,9 +22,12 @@
*
*/
+#include "ReplicationTest.h"
#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueRegistry.h>
#include <qpid/framing/amqp_types.h>
#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
#include <string>
namespace qpid {
@@ -45,8 +48,15 @@ class PrimaryQueueLimits
{
public:
// FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max
- PrimaryQueueLimits(const std::string& lp) :
- logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) {}
+ PrimaryQueueLimits(const std::string& lp,
+ broker::QueueRegistry& qr,
+ const ReplicationTest& rt
+ ) :
+ logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0)
+ {
+ // Get initial count of replicated queues
+ qr.eachQueue(boost::bind(&PrimaryQueueLimits::addQueueIfReplicated, this, _1, rt));
+ }
/** Add a replicated queue
*@exception ResourceLimitExceededException if this would exceed the limit.
@@ -57,15 +67,22 @@ class PrimaryQueueLimits
<< " exceeds limit of " << maxQueues
<< " replicated queues.");
throw framing::ResourceLimitExceededException(
- "Exceeded replicated queue limit.");
+ Msg() << "Exceeded replicated queue limit " << queues << " >= " << maxQueues);
}
else ++queues;
}
+ void addQueueIfReplicated(const boost::shared_ptr<broker::Queue>& q, const ReplicationTest& rt) {
+ if(rt.useLevel(*q)) addQueue(q);
+ }
+
/** Remove a replicated queue.
* @pre Was previously added with addQueue
*/
- void removeQueue(const boost::shared_ptr<broker::Queue>&) { --queues; }
+ void removeQueue(const boost::shared_ptr<broker::Queue>&) {
+ assert(queues != 0);
+ --queues;
+ }
// TODO aconway 2014-01-24: Currently replication links always use the
// hard-coded framing::CHANNEL_MAX. In future (e.g. when we support AMQP1.0
@@ -83,7 +100,7 @@ class PrimaryQueueLimits
std::string logPrefix;
uint64_t maxQueues;
uint64_t queues;
-};
+};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 0993c6ea39..bf3d779151 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -41,7 +41,7 @@ RemoteBackup::RemoteBackup(
std::ostringstream oss;
oss << "Remote backup at " << info << ": ";
logPrefix = oss.str();
- QPID_LOG(debug, logPrefix << "Connected");
+ QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected"));
}
RemoteBackup::~RemoteBackup() {
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
index 647523ef2c..d2152363fe 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -29,20 +29,20 @@ namespace ha {
using types::Variant;
-ReplicateLevel ReplicationTest::getLevel(const std::string& str) {
+ReplicateLevel ReplicationTest::getLevel(const std::string& str) const {
Enum<ReplicateLevel> rl(replicateDefault);
if (!str.empty()) rl.parse(str);
return rl.get();
}
-ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) {
+ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) const {
if (f.isSet(QPID_REPLICATE))
return getLevel(f.getAsString(QPID_REPLICATE));
else
return replicateDefault;
}
-ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
+ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) const {
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end())
return getLevel(i->second.asString());
@@ -50,7 +50,7 @@ ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
return replicateDefault;
}
-ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
+ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) const {
const Variant::Map& qmap(q.getSettings().original);
Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE);
if (i != qmap.end())
@@ -59,16 +59,15 @@ ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
return getLevel(q.getSettings().storeSettings);
}
-ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) {
+ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) const {
return getLevel(ex.getArgs());
}
-ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q)
-{
+ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) const {
return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) : getLevel(q);
}
-ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) {
+ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) const {
return ReplicationTest::getLevel(ex);
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index c157385ce6..8fe74ee959 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -56,18 +56,18 @@ class ReplicationTest
replicateDefault(replicateDefault_) {}
// Get the replication level set on an object, or default if not set.
- ReplicateLevel getLevel(const std::string& str);
- ReplicateLevel getLevel(const framing::FieldTable& f);
- ReplicateLevel getLevel(const types::Variant::Map& m);
- ReplicateLevel getLevel(const broker::Queue&);
- ReplicateLevel getLevel(const broker::Exchange&);
+ ReplicateLevel getLevel(const std::string& str) const;
+ ReplicateLevel getLevel(const framing::FieldTable& f) const;
+ ReplicateLevel getLevel(const types::Variant::Map& m) const;
+ ReplicateLevel getLevel(const broker::Queue&) const;
+ ReplicateLevel getLevel(const broker::Exchange&) const;
// Calculate level for objects that may not have replication set,
// including auto-delete/exclusive settings.
- ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive);
- ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive);
- ReplicateLevel useLevel(const broker::Queue&);
- ReplicateLevel useLevel(const broker::Exchange&);
+ ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const;
+ ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const;
+ ReplicateLevel useLevel(const broker::Queue&) const;
+ ReplicateLevel useLevel(const broker::Exchange&) const;
private:
ReplicateLevel replicateDefault;
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 748c8ef0c1..0f92f7dbcc 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -196,7 +196,7 @@ acl allow all all
def ha_status(self): return self.qmf().status
- def wait_status(self, status):
+ def wait_status(self, status, timeout=5):
def try_get_status():
self._status = "<unknown>"
# Ignore ConnectionError, the broker may not be up yet.
@@ -204,7 +204,7 @@ acl allow all all
self._status = self.ha_status()
return self._status == status;
except ConnectionError: return False
- assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%(
+ assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
self, status, self._status)
def wait_queue(self, queue, timeout=1):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index abc62b643e..f22e12a355 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -885,6 +885,19 @@ acl deny all all
old_sess.exchange_declare(exchange='ex1', type='fanout')
cluster[1].wait_backup("ex1")
+ def test_resource_limit_bug(self):
+ """QPID-5666 Regression test: Incorrect resource limit exception for queue creation."""
+ cluster = HaCluster(self, 3)
+ qs = ["q%s"%i for i in xrange(10)]
+ s = cluster[0].connect().session()
+ s.sender("q;{create:always}").close()
+ cluster.kill(0)
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ s = cluster[1].connect().session()
+ s.receiver("q;{delete:always}").close()
+ s.sender("qq;{create:always}").close()
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit