From 207658cb16be3c02adfafdb302f8dd6962047608 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Sun, 18 Oct 2009 15:20:34 +0000 Subject: r817742 (the fix for QPID-2102) did not cover the case for 2pc transactions recovered in the prepared state; this fixes that case. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@826460 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 4 ++++ qpid/cpp/src/qpid/broker/Queue.h | 6 ++++++ qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp | 15 +++++++++++---- qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp | 6 +++++- 4 files changed, 26 insertions(+), 5 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 86de96468d..b4160edbd6 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -179,6 +179,10 @@ void Queue::deliver(boost::intrusive_ptr& msg){ } } +void Queue::recoverPrepared(boost::intrusive_ptr& msg) +{ + if (policy.get()) policy->recoverEnqueued(msg); +} void Queue::recover(boost::intrusive_ptr& msg){ if (policy.get()) policy->recoverEnqueued(msg); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 661e46f619..a2dad96fe0 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -336,6 +336,12 @@ namespace qpid { // For cluster update QueueListeners& getListeners(); + + /** + * Reserve space in policy for an enqueued message that + * has been recovered in the prepared state (dtx only) + */ + void recoverPrepared(boost::intrusive_ptr& msg); }; } } diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp index c4dd6d5f87..658fd5a89e 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp @@ -23,17 +23,24 @@ using boost::intrusive_ptr; using namespace qpid::broker; -RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr _msg) : queue(_queue), msg(_msg) {} +RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr _msg) : queue(_queue), msg(_msg) +{ + queue->recoverPrepared(msg); +} -bool RecoveredDequeue::prepare(TransactionContext*) throw(){ +bool RecoveredDequeue::prepare(TransactionContext*) throw() +{ //should never be called; transaction has already prepared if an enqueue is recovered return false; } -void RecoveredDequeue::commit() throw(){ +void RecoveredDequeue::commit() throw() +{ + queue->enqueueAborted(msg); } -void RecoveredDequeue::rollback() throw(){ +void RecoveredDequeue::rollback() throw() +{ msg->enqueueComplete(); queue->process(msg); } diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp index 419c9771fc..48faa0942c 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp @@ -23,7 +23,10 @@ using boost::intrusive_ptr; using namespace qpid::broker; -RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr _msg) : queue(_queue), msg(_msg) {} +RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr _msg) : queue(_queue), msg(_msg) +{ + queue->recoverPrepared(msg); +} bool RecoveredEnqueue::prepare(TransactionContext*) throw(){ //should never be called; transaction has already prepared if an enqueue is recovered @@ -36,5 +39,6 @@ void RecoveredEnqueue::commit() throw(){ } void RecoveredEnqueue::rollback() throw(){ + queue->enqueueAborted(msg); } -- cgit v1.2.1