diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-07-10 04:04:23 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-07-10 04:04:23 +0000 |
| commit | 0dd8e35383d50ea57edb1112217473cf6b0e9f7b (patch) | |
| tree | d874916513632a0ec99d963429ec3aa2c009e088 /qpid/java | |
| parent | d0e58803b1baba4d423c9054a28b12bb3b78ded8 (diff) | |
| download | qpid-python-0dd8e35383d50ea57edb1112217473cf6b0e9f7b.tar.gz | |
QPID-1171: batch acks when prefetch is used
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@675433 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 40 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java | 1 |
2 files changed, 37 insertions, 4 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 2d98fd0dcd..9dff8e3e7e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -74,6 +74,9 @@ public class AMQSession_0_10 extends AMQSession // a ref on the qpidity connection protected org.apache.qpidity.nclient.Connection _qpidConnection; + private RangeSet unacked = new RangeSet(); + private int unackedCount = 0; + /** * USed to store the range of in tx messages */ @@ -131,6 +134,18 @@ public class AMQSession_0_10 extends AMQSession defaultPrefetchHigh, defaultPrefetchLow); } + private void addUnacked(int id) + { + unacked.add(id); + unackedCount++; + } + + private void clearUnacked() + { + unacked.clear(); + unackedCount = 0; + } + //------- overwritten methods of class AMQSession /** @@ -140,6 +155,7 @@ public class AMQSession_0_10 extends AMQSession * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the * delivery tag, <tt>false</tt> to just acknowledge that message. */ + public void acknowledgeMessage(long deliveryTag, boolean multiple) { if (_logger.isDebugEnabled()) @@ -147,14 +163,13 @@ public class AMQSession_0_10 extends AMQSession _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId); } // acknowledge this message - RangeSet ranges = new RangeSet(); if (multiple) { for (Long messageTag : _unacknowledgedMessageTags) { if( messageTag <= deliveryTag ) { - ranges.add((int) (long) messageTag); + addUnacked(messageTag.intValue()); _unacknowledgedMessageTags.remove(messageTag); } } @@ -163,10 +178,26 @@ public class AMQSession_0_10 extends AMQSession } else { - ranges.add((int) deliveryTag); + addUnacked((int) deliveryTag); _unacknowledgedMessageTags.remove(deliveryTag); } - getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + + long prefetch = getAMQConnection().getMaxPrefetch(); + + if (unackedCount >= prefetch/2) + { + flushAcknowledgments(); + } + } + + void flushAcknowledgments() + { + if (unackedCount > 0) + { + getQpidSession().messageAcknowledge + (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + clearUnacked(); + } } /** @@ -210,6 +241,7 @@ public class AMQSession_0_10 extends AMQSession */ public void sendClose(long timeout) throws AMQException, FailoverException { + flushAcknowledgments(); getQpidSession().sync(); getQpidSession().close(); getCurrentException(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index ce2f2180b1..b10eac3a1d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -127,6 +127,7 @@ public class XAResourceImpl implements XAResource default: throw new XAException(XAException.XAER_INVAL); } + _xaSession.flushAcknowledgments(); Future<XaResult> future = _xaSession.getQpidSession() .dtxEnd(convertXid(xid), flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE, |
