diff options
author | Alan Conway <aconway@apache.org> | 2008-07-01 18:01:11 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-07-01 18:01:11 +0000 |
commit | b010894ebe6c468fef0c14ad869b80ef336ab11f (patch) | |
tree | 87fd021e862ad21abffc9457711f066651e67418 | |
parent | 4db79de7e806ceba3a243abef9847f15fc41cc40 (diff) | |
download | qpid-python-b010894ebe6c468fef0c14ad869b80ef336ab11f.tar.gz |
Added timeout to SubscriptionManager::get(), LocalQueue::get() and BlockingQueue::get()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673158 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 97 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Waitable.h | 10 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/ConcurrentQueue.cpp | 208 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 41 |
9 files changed, 118 insertions, 317 deletions
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index 04cee40a37..99ab6f0133 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -31,14 +31,25 @@ using namespace framing; LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {} LocalQueue::~LocalQueue() {} -Message LocalQueue::pop() { +Message LocalQueue::pop() { return get(); } + +Message LocalQueue::get() { + Message result; + bool ok = get(result, sys::TIME_INFINITE); + assert(ok); (void) ok; + return result; +} + +bool LocalQueue::get(Message& result, sys::Duration timeout) { if (!queue) throw ClosedException(); - FrameSet::shared_ptr content = queue->pop(); + FrameSet::shared_ptr content; + bool ok = queue->pop(content, timeout); + if (!ok) return false; if (content->isA<MessageTransferBody>()) { - Message m(*content); - autoAck.ack(m, session); - return m; + result = Message(*content); + autoAck.ack(result, session); + return true; } else throw CommandInvalidException( diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index 273814f179..f81065ef3c 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -25,6 +25,7 @@ #include "qpid/client/Message.h" #include "qpid/client/Demux.h" #include "qpid/client/AckPolicy.h" +#include "qpid/sys/Time.h" namespace qpid { namespace client { @@ -42,7 +43,7 @@ class LocalQueue public: /** Create a local queue. Subscribe the local queue to a remote broker * queue with a SubscriptionManager. - * + * * LocalQueue is an alternative to implementing a MessageListener. * *@param ackPolicy Policy for acknowledging messages. @see AckPolicy. @@ -51,14 +52,22 @@ class LocalQueue ~LocalQueue(); - /** Pop the next message off the local queue. + /** Wait up to timeout for the next message from the local queue. + *@param result Set to the message from the queue. + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if queue was empty after timeout. + */ + bool get(Message& result, sys::Duration timeout=0); + + /** Get the next message off the local queue, or wait for a + * message from the broker queue. *@exception ClosedException if subscription has been closed. */ + Message get(); + + /** Synonym for get(). */ Message pop(); - /** Synonym for pop(). */ - Message get() { return pop(); } - /** Return true if local queue is empty. */ bool empty() const; @@ -72,10 +81,11 @@ class LocalQueue AckPolicy& getAckPolicy(); private: - friend class SubscriptionManager; Session session; Demux::QueuePtr queue; AckPolicy autoAck; + + friend class SubscriptionManager; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 324b11e1df..9bb75f9a49 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -129,10 +129,13 @@ void SubscriptionManager::stop() dispatcher.stop(); } -Message SubscriptionManager::get(const std::string& queue) { +bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { LocalQueue lq; - subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str()); - return lq.get(); + std::string unique = framing::Uuid(true).str(); + subscribe(lq, queue, FlowControl::messageCredit(1), unique); + AutoCancel ac(*this, unique); + sync(session).messageFlush(unique); + return lq.get(result, timeout); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 0aa55099f5..3dad15fd29 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -122,10 +122,14 @@ class SubscriptionManager : public sys::Runnable const std::string& queue, const std::string& tag=std::string()); - /** - * Get a single message from a queue. + + /** Get a single message from a queue. + *@param result is set to the message from the queue. + *@ + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if no message available after timeout. */ - Message get(const std::string& queue); + bool get(Message& result, const std::string& queue, sys::Duration timeout=0); /** Cancel a subscription. */ void cancel(const std::string tag); @@ -191,6 +195,15 @@ class SubscriptionManager : public sys::Runnable AckPolicy& getAckPolicy(); }; +/** AutoCancel cancels a subscription in its destructor */ +class AutoCancel { + public: + AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {} + ~AutoCancel() { sm.cancel(tag); } + private: + SubscriptionManager& sm; + std::string tag; +}; }} // namespace qpid::client diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index 86020fad81..9bb215ff7f 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -35,47 +35,45 @@ namespace sys { template <class T> class BlockingQueue { - mutable sys::Waitable lock; + mutable sys::Waitable waitable; std::queue<T> queue; - bool closed; public: - BlockingQueue() : closed(false) {} + BlockingQueue() {} ~BlockingQueue() { close(); } - /** Block until there is a value to pop */ - T pop() - { - Waitable::ScopedLock l(lock); - if (!queueWait()) throw ClosedException(); - return popInternal(); - } - - /** Non-blocking pop. If there is a value set outValue and return - * true, else return false; + /** Pop from the queue, block up to timeout if empty. + *@param result Set to value popped from queue. + *@param timeout Defaults to infinite. + *@return true if result was set, false if queue empty after timeout. */ - bool tryPop(T& outValue) { - Waitable::ScopedLock l(lock); + bool pop(T& result, Duration timeout=TIME_INFINITE) { + Mutex::ScopedLock l(waitable); + { + Waitable::ScopedWait w(waitable); + AbsTime deadline(now(),timeout); + while (queue.empty()) waitable.wait(deadline); + } if (queue.empty()) return false; - outValue = popInternal(); + result = queue.front(); + queue.pop(); + if (!queue.empty()) + waitable.notify(); // Notify another waiter. return true; } - /** Non-blocking pop. If there is a value return it, else return - * valueIfEmpty. - */ - T tryPop(const T& valueIfEmpty=T()) { - T result=valueIfEmpty; - tryPop(result); + T pop() { + T result; + bool ok = pop(result); + assert(ok); (void) ok; // Infinite wait. return result; } - + /** Push a value onto the queue */ - void push(const T& t) - { - Waitable::ScopedLock l(lock); + void push(const T& t) { + Mutex::ScopedLock l(waitable); queue.push(t); - queueNotify(0); + waitable.notify(); // Notify a waiter. } /** @@ -84,56 +82,33 @@ public: */ void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException())) { - Waitable::ScopedLock l(lock); - if (!closed) { - lock.setException(ex); - closed = true; - lock.notifyAll(); - lock.waitWaiters(); // Ensure no threads are still waiting. + Mutex::ScopedLock l(waitable); + if (!waitable.hasException()) { + waitable.setException(ex); + waitable.notifyAll(); + waitable.waitWaiters(); // Ensure no threads are still waiting. } } /** Open a closed queue. */ void open() { - Waitable::ScopedLock l(lock); - closed=false; + Mutex::ScopedLock l(waitable); + waitable.resetException(); } bool isClosed() const { - Waitable::ScopedLock l(lock); - return closed; + Mutex::ScopedLock l(waitable); + return waitable.hasException(); } bool empty() const { - Waitable::ScopedLock l(lock); + Mutex::ScopedLock l(waitable); return queue.empty(); } size_t size() const { - Waitable::ScopedLock l(lock); + Mutex::ScopedLock l(waitable); return queue.size(); } - - private: - - void queueNotify(size_t ignore) { - if (!queue.empty() && lock.hasWaiters()>ignore) - lock.notify(); // Notify another waiter. - } - - bool queueWait() { - Waitable::ScopedWait w(lock); - while (!closed && queue.empty()) - lock.wait(); - return !queue.empty(); - } - - T popInternal() { - T t=queue.front(); - queue.pop(); - queueNotify(1); - return t; - } - }; }} diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h index 61b7e7d82b..7701b6f97d 100644 --- a/cpp/src/qpid/sys/Waitable.h +++ b/cpp/src/qpid/sys/Waitable.h @@ -76,6 +76,12 @@ class Waitable : public Monitor { } + /** True if the waitable has an exception */ + bool hasException() const { return exception; } + + /** Clear the exception if any */ + void resetException() { exception.reset(); } + /** Throws an exception if one is set before or during the wait. */ void wait() { ExCheck e(exception); @@ -88,8 +94,6 @@ class Waitable : public Monitor { return Monitor::wait(absoluteTime); } - ExceptionHolder exception; - private: struct ExCheck { const ExceptionHolder& exception; @@ -98,6 +102,8 @@ class Waitable : public Monitor { }; size_t waiters; + ExceptionHolder exception; + friend struct ScopedWait; }; diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 505f3248a4..b55e4c231e 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -41,6 +41,7 @@ using namespace qpid::client::arg; using namespace qpid::framing; using namespace qpid; using qpid::sys::Monitor; +using qpid::sys::TIME_SEC; using std::string; using std::cout; using std::endl; @@ -242,8 +243,11 @@ QPID_AUTO_TEST_CASE(testGet) { fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true); fix.session.messageTransfer(content=Message("foo0", "getq")); fix.session.messageTransfer(content=Message("foo1", "getq")); - BOOST_CHECK_EQUAL("foo0", fix.subs.get("getq").getData()); - BOOST_CHECK_EQUAL("foo1", fix.subs.get("getq").getData()); + Message got; + BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); + BOOST_CHECK_EQUAL("foo0", got.getData()); + BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); + BOOST_CHECK_EQUAL("foo1", got.getData()); } QPID_AUTO_TEST_CASE(testOpenFailure) { diff --git a/cpp/src/tests/ConcurrentQueue.cpp b/cpp/src/tests/ConcurrentQueue.cpp deleted file mode 100644 index c6ca40e897..0000000000 --- a/cpp/src/tests/ConcurrentQueue.cpp +++ /dev/null @@ -1,208 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/**@file - * Compare alternative implementations for BlockingQueue. - */ - -#include "qpid/sys/BlockingQueue.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Time.h" - -#include <boost/test/test_tools.hpp> -#include <boost/bind.hpp> - -#include <deque> -#include <vector> -#include <iostream> - -#include "time.h" - -using namespace qpid::sys; -using namespace std; - -template <class T> class DualVectorDualLockQueue { - public: - /** Optionally specify initial capacity of the queue to minimize - * re-allocation. - */ - DualVectorDualLockQueue(size_t capacity=16) { - pushVec.reserve(capacity); - popVec.reserve(capacity); - popIter = popVec.end(); - } - - /** Push a data item onto the back of the queue */ - void push(const T& data) { - Mutex::ScopedLock l(pushLock); - pushVec.push_back(data); - } - - /** If the queue is non-empty, pop the front item into data and - * return true. If the queue is empty, return false - */ - bool tryPop(T& data) { - Mutex::ScopedLock l(popLock); - if (popIter == popVec.end()) { - popVec.clear(); - Mutex::ScopedLock l(pushLock); - pushVec.swap(popVec); - popIter = popVec.begin(); - } - if (popIter == popVec.end()) - return false; - else { - data = *popIter++; - return true; - } - } - - private: - Mutex pushLock, popLock; - std::vector<T> pushVec, popVec; - typename std::vector<T>::iterator popIter; -}; - -template <class T> struct LockedDequeQueue : public BlockingQueue<T> { - /** size_t ignored, can't pre-allocate space in a dequeue */ - LockedDequeQueue(size_t=0) {}; -}; - -// ================ Test code. - -/** Pause by sleeping */ -void nsleep(const Duration& delay) { - static Monitor m; - AbsTime stop(now(), delay); - while (now() < stop) - m.wait(stop); -} - -/** Pause by spinning */ -void nspin(const Duration& delay) { - AbsTime stop(now(), delay); - while (now() < stop) - ; -} - -/** Unlocked fake queue for comparison */ -struct NullQueue { - NullQueue(int items=0) : npush(items), npop(items) {} - void push(int) { --npush; } - bool tryPop(int& n) { - if (npop == 0) - return false; - else { - n=npop--; - return true; - } - } - volatile int npush, npop; -}; - - -// Global test parameters. -int items; -Duration delay(0); -boost::function<void()> npause; - -template <class Q> -struct Pusher : public Runnable { - Pusher(Q& q) : queue(q) {} - void run() { - for (int i=items; i > 0; i--) { - queue.push(i); - npause(); - } - } - Q& queue; -}; - -template <class Q> -struct Popper : public Runnable { - Popper(Q& q) : queue(q) {} - void run() { - for (int i=items; i > 0; i--) { - int n; - if (queue.tryPop(n)) - BOOST_REQUIRE_EQUAL(i,n); - npause(); - } - } - Q& queue; -}; - -ostream& operator<<(ostream& out, const Duration& d) { - return out << double(d)/TIME_MSEC << " msecs"; -} - -void report(const char* s, const Duration &d) { - cout << s << ": " << d - << " (" << (double(items)*TIME_SEC)/d << " push-pops/sec" << ")" - << endl; -} - -template <class Q, class PusherT=Pusher<Q>, class PopperT=Popper<Q> > -struct Timer { - static Duration time() { - cout << endl << "==" << typeid(Q).name() << endl; - - Q queue(items); - PusherT pusher(queue); - PopperT popper(queue); - - // Serial - AbsTime start=now(); - pusher.run(); - popper.run(); - Duration serial(start,now()); - report ("Serial", serial); - - // Concurrent - start=now(); - Thread pushThread(pusher); - Thread popThread(popper); - pushThread.join(); - popThread.join(); - Duration concurrent(start,now()); - report ("Concurrent", concurrent); - - cout << "Serial/concurrent: " << double(serial)/concurrent << endl; - return concurrent; - } -}; - -int test_main(int argc, char** argv) { - items = (argc > 1) ? atoi(argv[1]) : 250*1000; - delay = (argc > 2) ? atoi(argv[2]) : 4*1000; - npause=boost::bind(nspin, delay); - - cout << "Push/pop " << items << " items, delay=" << delay << endl; - Timer<NullQueue>::time(); - Duration dv = Timer<DualVectorDualLockQueue<int> >::time(); - Duration d = Timer<LockedDequeQueue<int> >::time(); - cout << endl; - cout << "Ratio deque/dual vector=" << double(d)/dv << endl; - return 0; -} -// namespace diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index b16c8f6cc0..63e3b257b3 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -164,22 +164,18 @@ void ClusterFixture::add() { push_back(new BrokerFixture(opts)); } -#if 0 // FIXME aconway 2008-06-26: TODO - - +#if 0 QPID_AUTO_TEST_CASE(testWiringReplication) { - const size_t SIZE=3; - ClusterFixture cluster(SIZE); + ClusterFixture cluster(3); Client c0(cluster[0].getPort()); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); c0.session.queueDeclare("q"); c0.session.exchangeDeclare("ex", arg::type="direct"); - BOOST_CHECK_EQUAL("q", c0.session.queueQuery("q").getQueue()); - BOOST_CHECK_EQUAL("direct", c0.session.exchangeQuery("ex").getType()); - + c0.session.close(); // Verify all brokers get wiring update. - for (size_t i = 1; i < cluster.size(); ++i) { + for (size_t i = 0; i < cluster.size(); ++i) { + BOOST_MESSAGE("i == "<< i); Client c(cluster[i].getPort()); BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); @@ -188,24 +184,15 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { QPID_AUTO_TEST_CASE(testMessageReplication) { // Enqueue on one broker, dequeue on another. - ClusterConnections cluster; - BOOST_REQUIRE(cluster.size() > 1); - - Session broker0 = cluster[0]->newSession(); - broker0.queueDeclare(queue="q"); - broker0.messageTransfer(content=TransferContent("data", "q")); - broker0.close(); - - Session broker1 = cluster[1]->newSession(); - broker1. - c.session.messageSubscribe(queue="q", destination="q"); - c.session.messageFlow(destination="q", unit=0, value=1);//messages - FrameSet::shared_ptr msg = c.session.get(); - BOOST_CHECK(msg->isA<MessageTransferBody>()); - BOOST_CHECK_EQUAL(string("data"), msg->getContent()); - c.session.getExecution().completed(msg->getId(), true, true); - cluster[i]->close(); - } + ClusterFixture cluster(2); + Client c0(cluster[0].getPort()); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=TransferContent("data", "q")); + c0.session.close(); + Client c1(cluster[1].getPort()); + Message msg; + BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(string("data"), msg.getData()); } // TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover. |