summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-06 16:29:27 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-06 16:29:27 +0000
commitf7f0bf52f98429935d9a964776c905ce5e54258f (patch)
tree4fdba543694c46346fd1c515b2594e543c5dc263 /qpid/java/client
parent67dfbf8458fb1b7e8b54a417d0d831176b6ddbe3 (diff)
downloadqpid-python-f7f0bf52f98429935d9a964776c905ce5e54258f.tar.gz
QPID-3526, QPID-3524: make sure the 0-10 client message.acknowledge() actually acknowledges messages immediately, and does so synchronously, adding test to verify behaviour. Split acknowledge() and commit() methods into version specific session implementations for clarity/reuse, align 0-10 and 0-8/9 transacted publishing behaviour, refactor preDeliver and postDeliver methods, remove dead code from consumers.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1179695 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java86
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java77
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java69
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java10
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java7
7 files changed, 118 insertions, 166 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 566c0320e9..a41f2f9b17 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -612,30 +612,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new IllegalStateException("Session is already closed");
}
- else if (hasFailedOver())
+ else if (hasFailedOverDirty())
{
+ //perform an implicit recover in this scenario
+ recover();
+
+ //notify the consumer
throw new IllegalStateException("has failed over");
}
- while (true)
+ try
{
- Long tag = _unacknowledgedMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- try
- {
- acknowledgeMessage(tag, false);
- }
- catch (TransportException e)
- {
- throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
- }
+ acknowledgeImpl();
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
}
}
+ protected abstract void acknowledgeImpl() throws JMSException;
+
/**
* Acknowledge one or many messages.
*
@@ -844,42 +841,28 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
- * @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
{
checkTransacted();
- try
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
{
- //Check that we are clean to commit.
- if (_failedOverDirty)
- {
- rollback();
-
- throw new TransactionRolledBackException("Connection failover has occured since last send. " +
- "Forced rollback");
- }
+ rollback();
+ throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
+ "The session transaction was rolled back.");
+ }
- // Acknowledge all delivered messages
- while (true)
- {
- Long tag = _deliveredMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- acknowledgeMessage(tag, false);
- }
- // Commits outstanding messages and acknowledgments
- sendCommit();
+ try
+ {
+ commitImpl();
markClean();
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
+ throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
@@ -891,8 +874,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- public abstract void sendCommit() throws AMQException, FailoverException;
-
+ protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
public void confirmConsumerCancelled(int consumerTag)
{
@@ -1580,10 +1562,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
abstract public void sync() throws AMQException;
- public int getAcknowledgeMode() throws JMSException
+ public int getAcknowledgeMode()
{
- checkNotClosed();
-
return _acknowledgeMode;
}
@@ -1643,10 +1623,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _ticket;
}
- public boolean getTransacted() throws JMSException
+ public boolean getTransacted()
{
- checkNotClosed();
-
return _transacted;
}
@@ -3096,21 +3074,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*
* @return boolean true if failover has occured.
*/
- public boolean hasFailedOver()
+ public boolean hasFailedOverDirty()
{
return _failedOverDirty;
}
- /**
- * Check to see if any message have been sent in this transaction and have not been commited.
- *
- * @return boolean true if a message has been sent but not commited
- */
- public boolean isDirty()
- {
- return _dirty;
- }
-
public void setTicket(int ticket)
{
_ticket = ticket;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 0cca946f19..8f77cc6258 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -412,25 +412,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
-
- /**
- * Commit the receipt and the delivery of all messages exchanged by this session resources.
- */
- public void sendCommit() throws AMQException, FailoverException
- {
- getQpidSession().setAutoSync(true);
- try
- {
- getQpidSession().txCommit();
- }
- finally
- {
- getQpidSession().setAutoSync(false);
- }
- // We need to sync so that we get notify of an error.
- sync();
- }
-
/**
* Create a queue with a given name.
*
@@ -463,6 +444,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
+ RangeSet ranges = gatherUnackedRangeSet();
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ // We need to sync so that we get notify of an error.
+ sync();
+ }
+
+ private RangeSet gatherUnackedRangeSet()
+ {
RangeSet ranges = new RangeSet();
while (true)
{
@@ -471,11 +460,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
break;
}
- ranges.add((int) (long) tag);
+
+ ranges.add(tag.intValue());
}
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
- // We need to sync so that we get notify of an error.
- sync();
+
+ return ranges;
}
@@ -997,32 +986,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- @Override
- public void commit() throws JMSException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
- checkTransacted();
- try
- {
- if( _txSize > 0 )
- {
- messageAcknowledge(_txRangeSet, true);
- _txRangeSet.clear();
- _txSize = 0;
- }
- sendCommit();
- }
- catch(TransportException e)
+ if( _txSize > 0 )
{
- throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+ messageAcknowledge(_txRangeSet, true);
+ _txRangeSet.clear();
+ _txSize = 0;
}
- catch (AMQException e)
+
+ getQpidSession().setAutoSync(true);
+ try
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ getQpidSession().txCommit();
}
- catch (FailoverException e)
+ finally
{
- throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ getQpidSession().setAutoSync(false);
}
+ // We need to sync so that we get notify of an error.
+ sync();
}
protected final boolean tagLE(long tag1, long tag2)
@@ -1385,4 +1368,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return sb.toString();
}
+ protected void acknowledgeImpl()
+ {
+ RangeSet range = gatherUnackedRangeSet();
+
+ if(range.size() > 0 )
+ {
+ messageAcknowledge(range, true);
+ getQpidSession().sync();
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 3ae6af6350..ccb2b00947 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -76,6 +76,7 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,6 +126,20 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
return getProtocolHandler().getProtocolVersion();
}
+ protected void acknowledgeImpl()
+ {
+ while (true)
+ {
+ Long tag = _unacknowledgedMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+ }
+
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -170,8 +185,20 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}
}
- public void sendCommit() throws AMQException, FailoverException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
+ // Acknowledge all delivered messages
+ while (true)
+ {
+ Long tag = _deliveredMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+
final AMQProtocolHandler handler = getProtocolHandler();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 0dc8c8c3ed..e6e1398a35 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -37,10 +37,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -118,29 +115,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
protected final int _acknowledgeMode;
/**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
-
- /**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
*/
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** The last tag that was "multiple" acknowledged on this session (if transacted) */
- private long _lastAcked;
-
- /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
- private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
-
- private final Object _commitLock = new Object();
-
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -290,17 +268,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
- {
- if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
- }
-
- _session.setInRecovery(false);
- preDeliver(jmsMsg);
- }
-
/**
* @param immediate if true then return immediately if the connection is failing over
*
@@ -409,7 +376,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
return m;
@@ -482,7 +449,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
@@ -734,7 +701,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
if (isMessageListenerSet())
{
- preApplicationProcessing(jmsMessage);
+ preDeliver(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -758,19 +725,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
- void preDeliver(AbstractJMSMessage msg)
+ protected void preDeliver(AbstractJMSMessage msg)
{
+ _session.setInRecovery(false);
+
switch (_acknowledgeMode)
{
-
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
-
case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ _session.markDirty();
+ }
break;
case Session.SESSION_TRANSACTED:
if (isNoConsume())
@@ -792,15 +768,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
switch (_acknowledgeMode)
{
-
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- _session.markDirty();
- break;
-
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 47da59724c..d3494298d3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -203,9 +203,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
super.notifyMessage(messageFrame);
}
- @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ @Override
+ protected void preDeliver(AbstractJMSMessage jmsMsg)
{
- super.preApplicationProcessing(jmsMsg);
+ super.preDeliver(jmsMsg);
+
if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
{
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index e95f9a7414..bf4de782a5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -455,16 +455,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
AbstractJMSMessage message = convertToNativeMessage(origMessage);
- if (_transacted)
- {
- if (_session.hasFailedOver() && _session.isDirty())
- {
- throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
- new AMQSessionDirtyException("Failover has occurred and session is dirty " +
- "so unable to send."));
- }
- }
-
UUID messageId = null;
if (_disableMessageId)
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 71a3412dc1..6759b43387 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -64,7 +64,12 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
- public void sendCommit() throws AMQException, FailoverException
+ public void commitImpl() throws AMQException, FailoverException
+ {
+
+ }
+
+ public void acknowledgeImpl()
{
}