diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-29 14:55:37 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-29 14:55:37 +0000 |
| commit | 3f64f140c95f54adfd5d698765f01d04670a0af0 (patch) | |
| tree | d9ab7286491dff1e5228a5e707fadab888ee89fe /qpid/cpp/src | |
| parent | ab7c34a2e2b21faebf0d6d50f94223cde86601d0 (diff) | |
| download | qpid-python-3f64f140c95f54adfd5d698765f01d04670a0af0.tar.gz | |
QPID-3603: Fix deadlock in ReplicatingSubscription causing primary to hang sporadically
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343762 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 9d7df51e3b..4490e309aa 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -146,6 +146,8 @@ struct QueueRange { SequenceNumber front; SequenceNumber back; + QueueRange() { } + // FIXME aconway 2012-05-26: fix front calculation QueueRange(broker::Queue& q) { back = q.getPosition(); @@ -210,7 +212,6 @@ ReplicatingSubscription::ReplicatingSubscription( QueueRange primary(*queue); QueueRange backup(arguments); backupPosition = backup.back; - // We can re-use some backup messages if backup and primary queues // overlap and the backup is not missing messages at the front of the queue. if (!primary.empty && // Primary not empty @@ -252,15 +253,22 @@ ReplicatingSubscription::~ReplicatingSubscription() { // Called in subscription's connection thread when the subscription is created. void ReplicatingSubscription::initialize() { sys::Mutex::ScopedLock l(lock); // QueueObserver methods can be called concurrently + // Send initial dequeues and position to the backup. // There most be a shared_ptr(this) when sending. sendDequeueEvent(l); sendPositionEvent(position, l); backupPosition = position; - // Set the ready position, all messages after this has been set as an observer. - // All messages after this position have been seen by us as QueueObserver. - QueueRange range(*getQueue()); + // Set the ready position. All messages after this position have + // been seen by us as QueueObserver. + QueueRange range; + { + // Drop the lock, QueueRange will lock the queues message lock + // which is also locked around calls to enqueued() and dequeued() + sys::Mutex::ScopedUnlock u(lock); + range = QueueRange(*getQueue()); + } readyPosition = range.back; if (range.empty || position >= readyPosition) { setReady(l); @@ -291,6 +299,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { // Send the position before qm was enqueued. sendPositionEvent(qm.position-1, l); } + // Backup will automaticall advance by 1 on delivery of message. backupPosition = qm.position; } // Deliver the message @@ -314,9 +323,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) { if (ready) return; ready = true; - QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); // Notify Primary that a subscription is ready. - if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName()); + { + sys::Mutex::ScopedUnlock u(lock); + QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); + if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName()); + } } // INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg @@ -400,7 +412,10 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) dequeues.encode(buffer); dequeues.clear(); buffer.reset(); - sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); + { + sys::Mutex::ScopedUnlock u(lock); + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); + } } // QueueObserver override. Called after the message has been removed @@ -408,8 +423,8 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) // arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { + QPID_LOG(trace, logPrefix << "Dequeued " << qm); { - QPID_LOG(trace, logPrefix << "Dequeued " << qm); sys::Mutex::ScopedLock l(lock); dequeues.add(qm.position); // If we have not yet sent this message to the backup, then @@ -429,7 +444,10 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::M framing::Buffer buffer(&buf[0], buf.size()); pos.encode(buffer); buffer.reset(); - sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); + { + sys::Mutex::ScopedUnlock u(lock); + sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); + } } void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer) |
