summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-01-22 15:57:29 +0000
committerGordon Sim <gsim@apache.org>2014-01-22 15:57:29 +0000
commita7982ebadaaf85254c9fb33d56657e3934d68cd3 (patch)
tree7cd40142216ead5023cd9c8b64dfdcecef6159f4 /qpid/cpp/src
parentd3c27b1a0e29fae81b7863820a77079c669b896c (diff)
downloadqpid-python-a7982ebadaaf85254c9fb33d56657e3934d68cd3.tar.gz
QPID-5503 implement nextReceiver()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp17
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp8
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp2
7 files changed, 33 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index b762ed036c..0fa7bf218f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -280,6 +280,23 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
return false;
}
+boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout)
+{
+ qpid::sys::AbsTime until(convert(timeout));
+ while (true) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
+ boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver();
+ if (r) {
+ return r;
+ } else if (until > qpid::sys::now()) {
+ waitUntil(ssn, until);
+ } else {
+ return boost::shared_ptr<ReceiverContext>();
+ }
+ }
+}
+
void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 70ace473f2..2b5e85a6f5 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -85,6 +85,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void sync(boost::shared_ptr<SessionContext> ssn);
+ boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout);
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 2a45ccee44..38b3c07cf5 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -129,4 +129,9 @@ void ReceiverContext::reset(pn_session_t* session)
configure();
}
+bool ReceiverContext::hasCurrent()
+{
+ return pn_link_current(receiver);
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
index 59c0533c9a..d185ae3146 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -59,6 +59,7 @@ class ReceiverContext
void configure();
void verify();
Address getAddress() const;
+ bool hasCurrent();
private:
friend class ConnectionContext;
const std::string name;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 7673e744c7..4e5d71f788 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -89,8 +89,14 @@ void SessionContext::removeSender(const std::string& n)
senders.erase(n);
}
-boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
+boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
{
+ for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ if (i->second->hasCurrent()) {
+ return i->second;
+ }
+ }
+
return boost::shared_ptr<ReceiverContext>();
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index df69e92ed3..8c2bb040a6 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -57,7 +57,7 @@ class SessionContext
boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
void removeReceiver(const std::string&);
void removeSender(const std::string&);
- boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
+ boost::shared_ptr<ReceiverContext> nextReceiver();
uint32_t getReceivable();
uint32_t getUnsettledAcks();
bool settled();
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
index 8334876b84..4d427639d3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -108,7 +108,7 @@ qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::A
bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
{
- boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout);
+ boost::shared_ptr<ReceiverContext> r = connection->nextReceiver(session, timeout);
if (r) {
//TODO: cache handles in this case to avoid frequent allocation
receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r));