diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h | 27 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.cpp | 15 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.h | 18 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 4 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 13 |
7 files changed, 55 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index b4d50d1652..af4ae12177 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -136,7 +136,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : logPrefix("Primary: "), active(false), replicationTest(hb.getSettings().replicateDefault.get()), sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)), - queueLimits(logPrefix) + queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest) { // Note that at this point, we are still rejecting client connections. // So we are safe from client interference while we set up the primary. diff --git a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h index a2322f1545..d614a48099 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h +++ b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h @@ -22,9 +22,12 @@ * */ +#include "ReplicationTest.h" #include <qpid/broker/Queue.h> +#include <qpid/broker/QueueRegistry.h> #include <qpid/framing/amqp_types.h> #include <boost/shared_ptr.hpp> +#include <boost/bind.hpp> #include <string> namespace qpid { @@ -45,8 +48,15 @@ class PrimaryQueueLimits { public: // FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max - PrimaryQueueLimits(const std::string& lp) : - logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) {} + PrimaryQueueLimits(const std::string& lp, + broker::QueueRegistry& qr, + const ReplicationTest& rt + ) : + logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) + { + // Get initial count of replicated queues + qr.eachQueue(boost::bind(&PrimaryQueueLimits::addQueueIfReplicated, this, _1, rt)); + } /** Add a replicated queue *@exception ResourceLimitExceededException if this would exceed the limit. @@ -57,15 +67,22 @@ class PrimaryQueueLimits << " exceeds limit of " << maxQueues << " replicated queues."); throw framing::ResourceLimitExceededException( - "Exceeded replicated queue limit."); + Msg() << "Exceeded replicated queue limit " << queues << " >= " << maxQueues); } else ++queues; } + void addQueueIfReplicated(const boost::shared_ptr<broker::Queue>& q, const ReplicationTest& rt) { + if(rt.useLevel(*q)) addQueue(q); + } + /** Remove a replicated queue. * @pre Was previously added with addQueue */ - void removeQueue(const boost::shared_ptr<broker::Queue>&) { --queues; } + void removeQueue(const boost::shared_ptr<broker::Queue>&) { + assert(queues != 0); + --queues; + } // TODO aconway 2014-01-24: Currently replication links always use the // hard-coded framing::CHANNEL_MAX. In future (e.g. when we support AMQP1.0 @@ -83,7 +100,7 @@ class PrimaryQueueLimits std::string logPrefix; uint64_t maxQueues; uint64_t queues; -}; +}; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 0993c6ea39..bf3d779151 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -41,7 +41,7 @@ RemoteBackup::RemoteBackup( std::ostringstream oss; oss << "Remote backup at " << info << ": "; logPrefix = oss.str(); - QPID_LOG(debug, logPrefix << "Connected"); + QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected")); } RemoteBackup::~RemoteBackup() { diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp index 647523ef2c..d2152363fe 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp @@ -29,20 +29,20 @@ namespace ha { using types::Variant; -ReplicateLevel ReplicationTest::getLevel(const std::string& str) { +ReplicateLevel ReplicationTest::getLevel(const std::string& str) const { Enum<ReplicateLevel> rl(replicateDefault); if (!str.empty()) rl.parse(str); return rl.get(); } -ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) { +ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) const { if (f.isSet(QPID_REPLICATE)) return getLevel(f.getAsString(QPID_REPLICATE)); else return replicateDefault; } -ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) { +ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) const { Variant::Map::const_iterator i = m.find(QPID_REPLICATE); if (i != m.end()) return getLevel(i->second.asString()); @@ -50,7 +50,7 @@ ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) { return replicateDefault; } -ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) { +ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) const { const Variant::Map& qmap(q.getSettings().original); Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE); if (i != qmap.end()) @@ -59,16 +59,15 @@ ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) { return getLevel(q.getSettings().storeSettings); } -ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) { +ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) const { return getLevel(ex.getArgs()); } -ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) -{ +ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) const { return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) : getLevel(q); } -ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) { +ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) const { return ReplicationTest::getLevel(ex); } diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h index c157385ce6..8fe74ee959 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.h +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h @@ -56,18 +56,18 @@ class ReplicationTest replicateDefault(replicateDefault_) {} // Get the replication level set on an object, or default if not set. - ReplicateLevel getLevel(const std::string& str); - ReplicateLevel getLevel(const framing::FieldTable& f); - ReplicateLevel getLevel(const types::Variant::Map& m); - ReplicateLevel getLevel(const broker::Queue&); - ReplicateLevel getLevel(const broker::Exchange&); + ReplicateLevel getLevel(const std::string& str) const; + ReplicateLevel getLevel(const framing::FieldTable& f) const; + ReplicateLevel getLevel(const types::Variant::Map& m) const; + ReplicateLevel getLevel(const broker::Queue&) const; + ReplicateLevel getLevel(const broker::Exchange&) const; // Calculate level for objects that may not have replication set, // including auto-delete/exclusive settings. - ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive); - ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive); - ReplicateLevel useLevel(const broker::Queue&); - ReplicateLevel useLevel(const broker::Exchange&); + ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const; + ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const; + ReplicateLevel useLevel(const broker::Queue&) const; + ReplicateLevel useLevel(const broker::Exchange&) const; private: ReplicateLevel replicateDefault; diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 748c8ef0c1..0f92f7dbcc 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -196,7 +196,7 @@ acl allow all all def ha_status(self): return self.qmf().status - def wait_status(self, status): + def wait_status(self, status, timeout=5): def try_get_status(): self._status = "<unknown>" # Ignore ConnectionError, the broker may not be up yet. @@ -204,7 +204,7 @@ acl allow all all self._status = self.ha_status() return self._status == status; except ConnectionError: return False - assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%( + assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%( self, status, self._status) def wait_queue(self, queue, timeout=1): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index abc62b643e..f22e12a355 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -885,6 +885,19 @@ acl deny all all old_sess.exchange_declare(exchange='ex1', type='fanout') cluster[1].wait_backup("ex1") + def test_resource_limit_bug(self): + """QPID-5666 Regression test: Incorrect resource limit exception for queue creation.""" + cluster = HaCluster(self, 3) + qs = ["q%s"%i for i in xrange(10)] + s = cluster[0].connect().session() + s.sender("q;{create:always}").close() + cluster.kill(0) + cluster[1].promote() + cluster[1].wait_status("active") + s = cluster[1].connect().session() + s.receiver("q;{delete:always}").close() + s.sender("qq;{create:always}").close() + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |
