summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-05-14 20:55:51 +0000
committerAlan Conway <aconway@apache.org>2010-05-14 20:55:51 +0000
commit94c33a9ea95ee56bb9e12018d053bfe68d86927f (patch)
tree2ccf9bf12be0e1e882b1886b4348fc1eb0e258bc
parent59688aba91d34168242e1cb0bdcd20f50232b362 (diff)
downloadqpid-python-94c33a9ea95ee56bb9e12018d053bfe68d86927f.tar.gz
Fix errors when new API Receiver is closed while in use.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@944461 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp27
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp20
-rw-r--r--cpp/src/tests/MessagingThreadTests.cpp26
3 files changed, 41 insertions, 32 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index b5d7bf78f4..cbf05fc521 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -186,22 +186,25 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
{
AbsTime deadline(AbsTime::now(), duration);
FrameSet::shared_ptr content;
- for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
- if (content->isA<MessageTransferBody>()) {
- MessageTransfer transfer(content, *this);
- if (handler && handler->accept(transfer)) {
- QPID_LOG(debug, "Delivered " << *content->getMethod());
- return true;
+ try {
+ for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ MessageTransfer transfer(content, *this);
+ if (handler && handler->accept(transfer)) {
+ QPID_LOG(debug, "Delivered " << *content->getMethod());
+ return true;
+ } else {
+ //received message for another destination, keep for later
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
+ received.push_back(content);
+ }
} else {
- //received message for another destination, keep for later
- QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
- sys::Mutex::ScopedLock l(lock);
- received.push_back(content);
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
}
- } else {
- //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
}
}
+ catch (const qpid::ClosedException&) {} // Just return false if queue closed.
return false;
}
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 9b706ab3de..fb5675c129 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -112,21 +112,16 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve
{
sys::Mutex::ScopedLock l(lock);
session = s;
+ if (state == CANCELLED) return;
if (state == UNRESOLVED) {
source = resolver.resolveSource(session, address);
assert(source.get());
state = STARTED;
}
- if (state == CANCELLED) {
- source->cancel(session, destination);
- parent->receiverCancelled(destination);
- } else {
- source->subscribe(session, destination);
- startFlow(l);
- }
+ source->subscribe(session, destination);
+ startFlow(l);
}
-
const std::string& ReceiverImpl::getName() const {
sys::Mutex::ScopedLock l(lock);
return destination;
@@ -156,6 +151,10 @@ ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (state == CANCELLED) return false;
+ }
return parent->get(*this, message, timeout);
}
@@ -163,7 +162,7 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging:
{
{
sys::Mutex::ScopedLock l(lock);
- if (state == CANCELLED) return false;//TODO: or should this be an error?
+ if (state == CANCELLED) return false;
if (capacity == 0 || state != STARTED) {
session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
@@ -174,6 +173,7 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging:
if (getImpl(message, timeout)) {
return true;
} else {
+ if (state == CANCELLED) return false; // Might have been closed during get.
sync(session).messageFlush(destination);
{
sys::Mutex::ScopedLock l(lock);
@@ -198,8 +198,6 @@ bool ReceiverImpl::isClosed() const {
return state == CANCELLED;
}
-
-
void ReceiverImpl::setCapacityImpl(uint32_t c)
{
sys::Mutex::ScopedLock l(lock);
diff --git a/cpp/src/tests/MessagingThreadTests.cpp b/cpp/src/tests/MessagingThreadTests.cpp
index 9cf139ddf5..a355ba7800 100644
--- a/cpp/src/tests/MessagingThreadTests.cpp
+++ b/cpp/src/tests/MessagingThreadTests.cpp
@@ -54,13 +54,14 @@ struct ReceiveThread : public sys::Runnable {
}
};
+
QPID_AUTO_TEST_CASE(testConcurrentSendReceive) {
- QueueFixture fix;
- Sender s = fix.session.createSender(fix.queue);
- Receiver r = fix.session.createReceiver(fix.queue+";{link:{reliability:unreliable}}");
+ MessagingFixture fix;
+ Sender s = fix.session.createSender("concurrent;{create:always}");
+ Receiver r = fix.session.createReceiver("concurrent;{create:always,link:{reliability:unreliable}}");
ReceiveThread rt(r);
sys::Thread thread(rt);
- const size_t COUNT=1000;
+ const size_t COUNT=100;
for (size_t i = 0; i < COUNT; ++i) {
s.send(Message());
}
@@ -71,28 +72,35 @@ QPID_AUTO_TEST_CASE(testConcurrentSendReceive) {
}
QPID_AUTO_TEST_CASE(testCloseBusyReceiver) {
- QueueFixture fix;
- Receiver r = fix.session.createReceiver(fix.queue);
+ MessagingFixture fix;
+ Receiver r = fix.session.createReceiver("closeReceiver;{create:always}");
ReceiveThread rt(r);
sys::Thread thread(rt);
+ sys::usleep(1000); // Give the receive thread time to block.
r.close();
thread.join();
BOOST_CHECK_EQUAL(rt.error, string());
- // Check that using a closed receiver gives the right result.
+ // Fetching on closed receiver should fail.
Message m;
BOOST_CHECK(!r.fetch(m, Duration(0)));
BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable);
}
QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) {
- QueueFixture fix;
- Receiver r = fix.session.createReceiver(fix.queue);
+ MessagingFixture fix;
+ Receiver r = fix.session.createReceiver("closeSession;{create:always}");
ReceiveThread rt(r);
sys::Thread thread(rt);
+ sys::usleep(1000); // Give the receive thread time to block.
fix.session.close();
thread.join();
BOOST_CHECK_EQUAL(rt.error, string());
+
+ // Fetching on closed receiver should fail.
+ Message m;
+ BOOST_CHECK(!r.fetch(m, Duration(0)));
+ BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable);
}
QPID_AUTO_TEST_SUITE_END()