summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-01-31 19:43:26 +0000
committerAlan Conway <aconway@apache.org>2013-01-31 19:43:26 +0000
commitcfe2e205dc364ce50d3b173eaaf3824c9767f01a (patch)
treeb12580a63dc77d9aa03b8ec5b5db202f88c371e8 /cpp
parentac1c4b788f7b461ba39653a992bf77a91b22bfa1 (diff)
downloadqpid-python-cfe2e205dc364ce50d3b173eaaf3824c9767f01a.tar.gz
QPID-4555: HA Fix race condition in rejecting connections.
Sporadic failure of test_failover_python was caused by a race in rejecting connections. There was a very small window where work could be done by a connection after it was rejected. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1441161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp17
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp1
2 files changed, 10 insertions, 8 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index e48db44716..72c3a7c7d9 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -228,15 +228,16 @@ void ReplicatingSubscription::initialize() {
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
- position = m.getSequence();
+bool ReplicatingSubscription::deliver(
+ const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
+{
try {
- QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
{
Mutex::ScopedLock l(lock);
- //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+ position = m.getSequence();
- // m.getSequence() is the position of the newly enqueued message on local queue.
+ // m.getSequence() is the position of the new message on local queue.
// backupPosition is latest position on backup queue before enqueueing
if (m.getSequence() <= backupPosition)
throw Exception(
@@ -252,7 +253,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const
}
return ConsumerImpl::deliver(c, m);
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
+ QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
<< ": " << e.what());
throw;
}
@@ -280,7 +281,7 @@ void ReplicatingSubscription::cancel()
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+ QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
guard->complete(r.getMessageId());
// If next message is protected by the guard then we are ready
if (r.getMessageId() >= guard->getRange().back) setReady();
@@ -309,7 +310,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const Message& m)
{
- QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
{
Mutex::ScopedLock l(lock);
dequeues.add(m.getSequence());
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 13c71e301b..cf08b482e6 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -97,6 +97,7 @@ void AsynchIOHandler::abort() {
if (!readError) {
aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1));
}
+ aio->queueWriteClose();
}
void AsynchIOHandler::activateOutput() {