summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp17
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp8
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp2
7 files changed, 33 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index b762ed036c..0fa7bf218f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -280,6 +280,23 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
return false;
}
+boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout)
+{
+ qpid::sys::AbsTime until(convert(timeout));
+ while (true) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
+ boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver();
+ if (r) {
+ return r;
+ } else if (until > qpid::sys::now()) {
+ waitUntil(ssn, until);
+ } else {
+ return boost::shared_ptr<ReceiverContext>();
+ }
+ }
+}
+
void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 70ace473f2..2b5e85a6f5 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -85,6 +85,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void sync(boost::shared_ptr<SessionContext> ssn);
+ boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout);
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 2a45ccee44..38b3c07cf5 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -129,4 +129,9 @@ void ReceiverContext::reset(pn_session_t* session)
configure();
}
+bool ReceiverContext::hasCurrent()
+{
+ return pn_link_current(receiver);
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
index 59c0533c9a..d185ae3146 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -59,6 +59,7 @@ class ReceiverContext
void configure();
void verify();
Address getAddress() const;
+ bool hasCurrent();
private:
friend class ConnectionContext;
const std::string name;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 7673e744c7..4e5d71f788 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -89,8 +89,14 @@ void SessionContext::removeSender(const std::string& n)
senders.erase(n);
}
-boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
+boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
{
+ for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ if (i->second->hasCurrent()) {
+ return i->second;
+ }
+ }
+
return boost::shared_ptr<ReceiverContext>();
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index df69e92ed3..8c2bb040a6 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -57,7 +57,7 @@ class SessionContext
boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
void removeReceiver(const std::string&);
void removeSender(const std::string&);
- boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
+ boost::shared_ptr<ReceiverContext> nextReceiver();
uint32_t getReceivable();
uint32_t getUnsettledAcks();
bool settled();
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
index 8334876b84..4d427639d3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -108,7 +108,7 @@ qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::A
bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout)
{
- boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout);
+ boost::shared_ptr<ReceiverContext> r = connection->nextReceiver(session, timeout);
if (r) {
//TODO: cache handles in this case to avoid frequent allocation
receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r));