From cfe2e205dc364ce50d3b173eaaf3824c9767f01a Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 31 Jan 2013 19:43:26 +0000 Subject: QPID-4555: HA Fix race condition in rejecting connections. Sporadic failure of test_failover_python was caused by a race in rejecting connections. There was a very small window where work could be done by a connection after it was rejected. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1441161 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/ha/ReplicatingSubscription.cpp | 17 +++++++++-------- cpp/src/qpid/sys/AsynchIOHandler.cpp | 1 + 2 files changed, 10 insertions(+), 8 deletions(-) (limited to 'cpp') diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index e48db44716..72c3a7c7d9 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -228,15 +228,16 @@ void ReplicatingSubscription::initialize() { } // Message is delivered in the subscription's connection thread. -bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { - position = m.getSequence(); +bool ReplicatingSubscription::deliver( + const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) +{ try { - QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"); + QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence()); { Mutex::ScopedLock l(lock); - //FIXME GRS: position is no longer set//assert(position == m.getSequence()); + position = m.getSequence(); - // m.getSequence() is the position of the newly enqueued message on local queue. + // m.getSequence() is the position of the new message on local queue. // backupPosition is latest position on backup queue before enqueueing if (m.getSequence() <= backupPosition) throw Exception( @@ -252,7 +253,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const } return ConsumerImpl::deliver(c, m); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]" + QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence() << ": " << e.what()); throw; } @@ -280,7 +281,7 @@ void ReplicatingSubscription::cancel() // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. - QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]"); + QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId()); guard->complete(r.getMessageId()); // If next message is protected by the guard then we are ready if (r.getMessageId() >= guard->getRange().back) setReady(); @@ -309,7 +310,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) // arbitrary connection threads. void ReplicatingSubscription::dequeued(const Message& m) { - QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]"); + QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence()); { Mutex::ScopedLock l(lock); dequeues.add(m.getSequence()); diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 13c71e301b..cf08b482e6 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -97,6 +97,7 @@ void AsynchIOHandler::abort() { if (!readError) { aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1)); } + aio->queueWriteClose(); } void AsynchIOHandler::activateOutput() { -- cgit v1.2.1