diff options
| author | Gordon Sim <gsim@apache.org> | 2014-01-22 15:57:29 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-01-22 15:57:29 +0000 |
| commit | a7982ebadaaf85254c9fb33d56657e3934d68cd3 (patch) | |
| tree | 7cd40142216ead5023cd9c8b64dfdcecef6159f4 /qpid/cpp/src | |
| parent | d3c27b1a0e29fae81b7863820a77079c669b896c (diff) | |
| download | qpid-python-a7982ebadaaf85254c9fb33d56657e3934d68cd3.tar.gz | |
QPID-5503 implement nextReceiver()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
7 files changed, 33 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index b762ed036c..0fa7bf218f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -280,6 +280,23 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared return false; } +boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (true) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn); + boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver(); + if (r) { + return r; + } else if (until > qpid::sys::now()) { + waitUntil(ssn, until); + } else { + return boost::shared_ptr<ReceiverContext>(); + } + } +} + void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 70ace473f2..2b5e85a6f5 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -85,6 +85,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject); void sync(boost::shared_ptr<SessionContext> ssn); + boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout); void setOption(const std::string& name, const qpid::types::Variant& value); std::string getAuthenticatedUsername(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 2a45ccee44..38b3c07cf5 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -129,4 +129,9 @@ void ReceiverContext::reset(pn_session_t* session) configure(); } +bool ReceiverContext::hasCurrent() +{ + return pn_link_current(receiver); +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 59c0533c9a..d185ae3146 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -59,6 +59,7 @@ class ReceiverContext void configure(); void verify(); Address getAddress() const; + bool hasCurrent(); private: friend class ConnectionContext; const std::string name; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 7673e744c7..4e5d71f788 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -89,8 +89,14 @@ void SessionContext::removeSender(const std::string& n) senders.erase(n); } -boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/) +boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver() { + for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { + if (i->second->hasCurrent()) { + return i->second; + } + } + return boost::shared_ptr<ReceiverContext>(); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index df69e92ed3..8c2bb040a6 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -57,7 +57,7 @@ class SessionContext boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const; void removeReceiver(const std::string&); void removeSender(const std::string&); - boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout); + boost::shared_ptr<ReceiverContext> nextReceiver(); uint32_t getReceivable(); uint32_t getUnsettledAcks(); bool settled(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp index 8334876b84..4d427639d3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -108,7 +108,7 @@ qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::A bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout) { - boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout); + boost::shared_ptr<ReceiverContext> r = connection->nextReceiver(session, timeout); if (r) { //TODO: cache handles in this case to avoid frequent allocation receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r)); |
