diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java | 33 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java | 7 |
2 files changed, 33 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index c62a7880a8..6ad704a5d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.ack; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; @@ -33,8 +34,8 @@ import org.apache.qpid.server.txn.TxnOp; public class TxAck implements TxnOp { private final UnacknowledgedMessageMap _map; - private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>(); - private final List<Long> _individual = new LinkedList<Long>(); + private final List <UnacknowledgedMessage> _unacked = new ArrayList<UnacknowledgedMessage>(); + private List<Long> _individual; private long _deliveryTag; private boolean _multiple; @@ -47,6 +48,10 @@ public class TxAck implements TxnOp { if (!multiple) { + if(_individual == null) + { + _individual = new ArrayList<Long>(); + } //have acked a single message that is not part of //the previously acked region so record //individually @@ -59,30 +64,46 @@ public class TxAck implements TxnOp _deliveryTag = deliveryTag; _multiple = true; } + _unacked.clear(); } public void consolidate() { + if(_unacked.isEmpty()) + { + consolidate(_unacked); + } + + } + + private void consolidate(List<UnacknowledgedMessage> unacked) + { //lookup all the unacked messages that have been acked in this transaction if (_multiple) { //get all the unacked messages for the accumulated //multiple acks - _map.collect(_deliveryTag, true, _unacked); + _map.collect(_deliveryTag, true, unacked); } //get any unacked messages for individual acks outside the //range covered by multiple acks - for (long tag : _individual) + if(_individual != null) { - if(_deliveryTag < tag) + for (Long tag : _individual) { - _map.collect(tag, false, _unacked); + if(_deliveryTag < tag) + { + _map.collect(tag, false, unacked); + } } } } public boolean checkPersistent() throws AMQException { + + + consolidate(); //if any of the messages in unacked are persistent the txn //buffer must be marked as persistent: for (UnacknowledgedMessage msg : _unacked) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index b12afd9a41..2307b94566 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -148,8 +148,9 @@ public class LocalTransactionalContext implements TransactionalContext // we will need to create and enlist the op. if (_ackOp == null) { - beginTranIfNecessary(); + _ackOp = new TxAck(unacknowledgedMessageMap); + _txnBuffer.enlist(_ackOp); } // update the op to include this ack request @@ -163,6 +164,10 @@ public class LocalTransactionalContext implements TransactionalContext { _ackOp.update(deliveryTag, multiple); } + if(!_inTran && _ackOp.checkPersistent()) + { + beginTranIfNecessary(); + } } public void messageFullyReceived(boolean persistent) throws AMQException |
