From a7982ebadaaf85254c9fb33d56657e3934d68cd3 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 22 Jan 2014 15:57:29 +0000 Subject: QPID-5503 implement nextReceiver() git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560394 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 17 +++++++++++++++++ qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 1 + qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 5 +++++ qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h | 1 + qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp | 8 +++++++- qpid/cpp/src/qpid/messaging/amqp/SessionContext.h | 2 +- qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp | 2 +- 7 files changed, 33 insertions(+), 3 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index b762ed036c..0fa7bf218f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -280,6 +280,23 @@ bool ConnectionContext::get(boost::shared_ptr ssn, boost::shared return false; } +boost::shared_ptr ConnectionContext::nextReceiver(boost::shared_ptr ssn, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (true) { + qpid::sys::ScopedLock l(lock); + checkClosed(ssn); + boost::shared_ptr r = ssn->nextReceiver(); + if (r) { + return r; + } else if (until > qpid::sys::now()) { + waitUntil(ssn, until); + } else { + return boost::shared_ptr(); + } + } +} + void ConnectionContext::acknowledge(boost::shared_ptr ssn, qpid::messaging::Message* message, bool cumulative) { qpid::sys::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 70ace473f2..2b5e85a6f5 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -85,6 +85,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void acknowledge(boost::shared_ptr ssn, qpid::messaging::Message* message, bool cumulative); void nack(boost::shared_ptr ssn, qpid::messaging::Message& message, bool reject); void sync(boost::shared_ptr ssn); + boost::shared_ptr nextReceiver(boost::shared_ptr ssn, qpid::messaging::Duration timeout); void setOption(const std::string& name, const qpid::types::Variant& value); std::string getAuthenticatedUsername(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 2a45ccee44..38b3c07cf5 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -129,4 +129,9 @@ void ReceiverContext::reset(pn_session_t* session) configure(); } +bool ReceiverContext::hasCurrent() +{ + return pn_link_current(receiver); +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 59c0533c9a..d185ae3146 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -59,6 +59,7 @@ class ReceiverContext void configure(); void verify(); Address getAddress() const; + bool hasCurrent(); private: friend class ConnectionContext; const std::string name; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 7673e744c7..4e5d71f788 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -89,8 +89,14 @@ void SessionContext::removeSender(const std::string& n) senders.erase(n); } -boost::shared_ptr SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/) +boost::shared_ptr SessionContext::nextReceiver() { + for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { + if (i->second->hasCurrent()) { + return i->second; + } + } + return boost::shared_ptr(); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index df69e92ed3..8c2bb040a6 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -57,7 +57,7 @@ class SessionContext boost::shared_ptr getReceiver(const std::string& name) const; void removeReceiver(const std::string&); void removeSender(const std::string&); - boost::shared_ptr nextReceiver(qpid::messaging::Duration timeout); + boost::shared_ptr nextReceiver(); uint32_t getReceivable(); uint32_t getUnsettledAcks(); bool settled(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp index 8334876b84..4d427639d3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -108,7 +108,7 @@ qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::A bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout) { - boost::shared_ptr r = session->nextReceiver(timeout); + boost::shared_ptr r = connection->nextReceiver(session, timeout); if (r) { //TODO: cache handles in this case to avoid frequent allocation receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r)); -- cgit v1.2.1