diff options
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 1 |
2 files changed, 10 insertions, 8 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index e48db44716..72c3a7c7d9 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -228,15 +228,16 @@ void ReplicatingSubscription::initialize() { } // Message is delivered in the subscription's connection thread. -bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { - position = m.getSequence(); +bool ReplicatingSubscription::deliver( + const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) +{ try { - QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"); + QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence()); { Mutex::ScopedLock l(lock); - //FIXME GRS: position is no longer set//assert(position == m.getSequence()); + position = m.getSequence(); - // m.getSequence() is the position of the newly enqueued message on local queue. + // m.getSequence() is the position of the new message on local queue. // backupPosition is latest position on backup queue before enqueueing if (m.getSequence() <= backupPosition) throw Exception( @@ -252,7 +253,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const } return ConsumerImpl::deliver(c, m); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]" + QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence() << ": " << e.what()); throw; } @@ -280,7 +281,7 @@ void ReplicatingSubscription::cancel() // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. - QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]"); + QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId()); guard->complete(r.getMessageId()); // If next message is protected by the guard then we are ready if (r.getMessageId() >= guard->getRange().back) setReady(); @@ -309,7 +310,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) // arbitrary connection threads. void ReplicatingSubscription::dequeued(const Message& m) { - QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]"); + QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence()); { Mutex::ScopedLock l(lock); dequeues.add(m.getSequence()); diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 13c71e301b..cf08b482e6 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -97,6 +97,7 @@ void AsynchIOHandler::abort() { if (!readError) { aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1)); } + aio->queueWriteClose(); } void AsynchIOHandler::activateOutput() { |
