diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-08 18:35:46 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-08 18:35:46 +0000 |
commit | 8362a8ce23f94b51ded1c41133069b69b31cf5dd (patch) | |
tree | 911be91f3d16b1312487059eab640619b4ab25c3 | |
parent | 6d88227a3bf38b3d09381ca5efa522ced44c7c79 (diff) | |
download | qpid-python-8362a8ce23f94b51ded1c41133069b69b31cf5dd.tar.gz |
QPID-1306
- added lvq support
- added lvq tests
- added safety test for lvq
- updated QueueOptions for lvq
- some refactor to queue, to have signel pop loction
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702958 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 70 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/QueueOptions.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/client/QueueOptions.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/QueueOptionsTest.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 100 |
6 files changed, 165 insertions, 32 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c899a5befa..fb8bd1288f 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -51,6 +51,21 @@ using std::for_each; using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; + +namespace +{ + const std::string qpidMaxSize("qpid.max_size"); + const std::string qpidMaxCount("qpid.max_count"); + const std::string qpidNoLocal("no-local"); + const std::string qpidTraceIdentity("qpid.trace.id"); + const std::string qpidTraceExclude("qpid.trace.exclude"); + const std::string qpidLastValueQueue("qpid.last_value_queue"); + const std::string qpidOptimisticConsume("qpid.optimistic_consume"); + const std::string qpidPersistLastNode("qpid.persist_last_node"); + const std::string qpidVQMatchProperty("qpid.LVQ_key"); +} + + Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, @@ -253,7 +268,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - messages.pop_front(); + popMsg(msg); return true; } else { //message(s) are available but consumer hasn't got enough credit @@ -371,7 +386,7 @@ QueuedMessage Queue::get(){ if(!messages.empty()){ msg = messages.front(); - messages.pop_front(); + popMsg(msg); } return msg; } @@ -406,22 +421,49 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { QueuedMessage qmsg = messages.front(); boost::intrusive_ptr<Message> msg = qmsg.payload; destq->deliver(msg); // deliver message to the destination queue - messages.pop_front(); + popMsg(qmsg); dequeue(0, qmsg); count++; } return count; } +void Queue::popMsg(QueuedMessage& qmsg) +{ + if (lastValueQueue){ + const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); + string key = ft->getString(qpidVQMatchProperty); + lvq.erase(key); + } + messages.pop_front(); +} + void Queue::push(boost::intrusive_ptr<Message>& msg){ Listeners copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (policy.get()) policy->tryEnqueue(qm); - - messages.push_back(qm); - listeners.swap(copy); + + //if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of existing message + LVQ::iterator i; + if (lastValueQueue){ + const framing::FieldTable* ft = msg->getApplicationHeaders(); + string key = ft->getString(qpidVQMatchProperty); + + i = lvq.find(key); + if (i == lvq.end()){ + messages.push_back(qm); + listeners.swap(copy); + lvq[key] = &messages.back(); + }else { + i->second->payload = msg; + } + }else { + + messages.push_back(qm); + listeners.swap(copy); + } } for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } @@ -514,8 +556,8 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::popAndDequeue() { QueuedMessage msg = messages.front(); - messages.pop_front(); - dequeue(0, msg); + popMsg(msg); + dequeue(0, msg); } /** @@ -529,18 +571,6 @@ void Queue::dequeued(const QueuedMessage& msg) } -namespace -{ - const std::string qpidMaxSize("qpid.max_size"); - const std::string qpidMaxCount("qpid.max_count"); - const std::string qpidNoLocal("no-local"); - const std::string qpidTraceIdentity("qpid.trace.id"); - const std::string qpidTraceExclude("qpid.trace.exclude"); - const std::string qpidLastValueQueue("qpid.last_value_queue"); - const std::string qpidOptimisticConsume("qpid.optimistic_consume"); - const std::string qpidPersistLastNode("qpid.persist_last_node"); -} - void Queue::create(const FieldTable& _settings) { settings = _settings; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 3bde07c4d6..213a36d59d 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -65,6 +65,7 @@ namespace qpid { typedef std::list<Consumer::shared_ptr> Listeners; typedef std::deque<QueuedMessage> Messages; + typedef std::map<string,QueuedMessage*> LVQ; const string name; const bool autodelete; @@ -81,6 +82,7 @@ namespace qpid { std::vector<std::string> traceExclude; Listeners listeners; Messages messages; + LVQ lvq; mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Mutex messageLock; mutable qpid::sys::Mutex ownershipLock; @@ -253,6 +255,9 @@ namespace qpid { } bool releaseMessageContent(const QueuedMessage&); + + void popMsg(QueuedMessage& qmsg); + }; } } diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp index d0fd6f1e5c..5d1cb74efd 100644 --- a/cpp/src/qpid/client/QueueOptions.cpp +++ b/cpp/src/qpid/client/QueueOptions.cpp @@ -38,6 +38,7 @@ const std::string QueueOptions::strRING_STRICT("ring_strict"); const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue"); const std::string QueueOptions::strOptimisticConsume("qpid.optimistic_consume"); const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node"); +const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key"); QueueOptions::~QueueOptions() @@ -83,15 +84,17 @@ void QueueOptions::setPersistLastNode() void QueueOptions::setOrdering(QueueOrderingPolicy op) { if (op == LVQ){ - // TODO, add and test options with LVQ patch. - // also set the key match for LVQ - //setString(LastValueQueue, 1); - + setInt(strLastValueQueue, 1); }else{ clearOrdering(); } } +void QueueOptions::getLVQKey(std::string& key) +{ + key.assign(strLVQMatchProperty); +} + void QueueOptions::clearSizePolicy() { erase(strMaxCountKey); diff --git a/cpp/src/qpid/client/QueueOptions.h b/cpp/src/qpid/client/QueueOptions.h index 21333794ac..37cb8616e3 100644 --- a/cpp/src/qpid/client/QueueOptions.h +++ b/cpp/src/qpid/client/QueueOptions.h @@ -86,6 +86,11 @@ class QueueOptions: public framing::FieldTable void clearPersistLastNode(); /** + * get the key used match LVQ in args for message transfer + */ + void getLVQKey(std::string& key); + + /** * Use default odering policy */ void clearOrdering(); @@ -100,7 +105,7 @@ class QueueOptions: public framing::FieldTable static const std::string strLastValueQueue; static const std::string strOptimisticConsume; static const std::string strPersistLastNode; - private: + static const std::string strLVQMatchProperty; diff --git a/cpp/src/tests/QueueOptionsTest.cpp b/cpp/src/tests/QueueOptionsTest.cpp index 308ba994c4..5c1dda697b 100644 --- a/cpp/src/tests/QueueOptionsTest.cpp +++ b/cpp/src/tests/QueueOptionsTest.cpp @@ -63,15 +63,19 @@ QPID_AUTO_TEST_CASE(testFlags) ft.setOptimisticConsume(); ft.setPersistLastNode(); + ft.setOrdering(LVQ); BOOST_CHECK(1 == ft.getInt(QueueOptions::strOptimisticConsume)); BOOST_CHECK(1 == ft.getInt(QueueOptions::strPersistLastNode)); + BOOST_CHECK(1 == ft.getInt(QueueOptions::strLastValueQueue)); ft.clearOptimisticConsume(); ft.clearPersistLastNode(); + ft.setOrdering(FIFO); BOOST_CHECK(!ft.isSet(QueueOptions::strOptimisticConsume)); BOOST_CHECK(!ft.isSet(QueueOptions::strPersistLastNode)); + BOOST_CHECK(!ft.isSet(QueueOptions::strLastValueQueue)); } diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index a15f6fa333..e18a2309fa 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -240,10 +240,8 @@ QPID_AUTO_TEST_CASE(testBound) QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ - FieldTable args; - - // set queue mode - args.setInt("qpid.persist_last_node", 1); + client::QueueOptions args; + args.setPersistLastNode(); Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); @@ -292,8 +290,8 @@ class TestMessageStoreOC : public NullMessageStore QPID_AUTO_TEST_CASE(testOptimisticConsume){ - FieldTable args; - args.setInt("qpid.persist_last_node", 1); + client::QueueOptions args; + args.setPersistLastNode(); // set queue mode @@ -305,7 +303,7 @@ QPID_AUTO_TEST_CASE(testOptimisticConsume){ msg1->forcePersistent(); //change mode - args.setInt("qpid.optimistic_consume", 1); + args.setOptimisticConsume(); queue->configure(args); //enqueue 1 message @@ -322,6 +320,94 @@ QPID_AUTO_TEST_CASE(testOptimisticConsume){ } +QPID_AUTO_TEST_CASE(testLVQOrdering){ + + client::QueueOptions args; + // set queue mode + args.setOrdering(client::LVQ); + + Queue::shared_ptr queue(new Queue("my-queue", true )); + queue->configure(args); + + intrusive_ptr<Message> msg1 = message("e", "A"); + intrusive_ptr<Message> msg2 = message("e", "B"); + intrusive_ptr<Message> msg3 = message("e", "C"); + intrusive_ptr<Message> msg4 = message("e", "D"); + intrusive_ptr<Message> received; + + //set deliever match for LVQ a,b,c,a + + string key; + args.getLVQKey(key); + BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); + + + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + + //enqueue 4 message + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + queue->deliver(msg4); + + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg4.get(), received.get()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg2.get(), received.get()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg3.get(), received.get()); + + intrusive_ptr<Message> msg5 = message("e", "A"); + intrusive_ptr<Message> msg6 = message("e", "B"); + intrusive_ptr<Message> msg7 = message("e", "C"); + msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + queue->deliver(msg5); + queue->deliver(msg6); + queue->deliver(msg7); + + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg5.get(), received.get()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg6.get(), received.get()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg7.get(), received.get()); + +} + +QPID_AUTO_TEST_CASE(testLVQSaftyCheck){ + +// This test is to check std::deque memory copy does not change out under us +// if this test fails, then lvq would no longer be safe. + + std::deque<string> deq; + + string a; + string b; + + deq.push_back(a); + deq.push_back(b); + string* tmp = &deq.back(); + for (int a =0; a<=100000; a++){ + string z; + deq.push_back(z); + } + deq.pop_front(); + BOOST_CHECK_EQUAL(&deq.front(),tmp); + +} QPID_AUTO_TEST_SUITE_END() |