From f7f0bf52f98429935d9a964776c905ce5e54258f Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 6 Oct 2011 16:29:27 +0000 Subject: QPID-3526, QPID-3524: make sure the 0-10 client message.acknowledge() actually acknowledges messages immediately, and does so synchronously, adding test to verify behaviour. Split acknowledge() and commit() methods into version specific session implementations for clarity/reuse, align 0-10 and 0-8/9 transacted publishing behaviour, refactor preDeliver and postDeliver methods, remove dead code from consumers. Applied patch from Oleksandr Rudyy and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1179695 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 86 ++--- .../org/apache/qpid/client/AMQSession_0_10.java | 77 ++-- .../org/apache/qpid/client/AMQSession_0_8.java | 29 +- .../apache/qpid/client/BasicMessageConsumer.java | 69 +--- .../qpid/client/BasicMessageConsumer_0_10.java | 6 +- .../apache/qpid/client/BasicMessageProducer.java | 10 - .../qpid/test/unit/message/TestAMQSession.java | 7 +- .../client/timeouts/SyncWaitTimeoutDelayTest.java | 2 +- .../qpid/test/unit/ack/ClientAcknowledgeTest.java | 82 +++++ .../unit/publish/DirtyTransactedPublishTest.java | 403 --------------------- qpid/java/test-profiles/CPPExcludes | 1 - qpid/java/test-profiles/CPPTransientExcludes | 3 + qpid/java/test-profiles/Java010Excludes | 3 - qpid/java/test-profiles/JavaTransientExcludes | 1 + 14 files changed, 205 insertions(+), 574 deletions(-) create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java delete mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 566c0320e9..a41f2f9b17 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -612,30 +612,27 @@ public abstract class AMQSession 0 ) - { - messageAcknowledge(_txRangeSet, true); - _txRangeSet.clear(); - _txSize = 0; - } - sendCommit(); - } - catch(TransportException e) + if( _txSize > 0 ) { - throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); + messageAcknowledge(_txRangeSet, true); + _txRangeSet.clear(); + _txSize = 0; } - catch (AMQException e) + + getQpidSession().setAutoSync(true); + try { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + getQpidSession().txCommit(); } - catch (FailoverException e) + finally { - throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); + getQpidSession().setAutoSync(false); } + // We need to sync so that we get notify of an error. + sync(); } protected final boolean tagLE(long tag1, long tag2) @@ -1385,4 +1368,14 @@ public class AMQSession_0_10 extends AMQSession 0 ) + { + messageAcknowledge(range, true); + getQpidSession().sync(); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3ae6af6350..ccb2b00947 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -76,6 +76,7 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +126,20 @@ public final class AMQSession_0_8 extends AMQSession extends Closeable implements Messa */ protected final int _acknowledgeMode; - /** - * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode - */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; - /** * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ private ConcurrentLinkedQueue _receivedDeliveryTags = new ConcurrentLinkedQueue(); - /** The last tag that was "multiple" acknowledged on this session (if transacted) */ - private long _lastAcked; - - /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */ - private final SortedSet _previouslyAcked = new TreeSet(); - - private final Object _commitLock = new Object(); - /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -290,17 +268,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } } - protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException - { - if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); - } - - _session.setInRecovery(false); - preDeliver(jmsMsg); - } - /** * @param immediate if true then return immediately if the connection is failing over * @@ -409,7 +376,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { - preApplicationProcessing(m); + preDeliver(m); postDeliver(m); } return m; @@ -482,7 +449,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { - preApplicationProcessing(m); + preDeliver(m); postDeliver(m); } @@ -734,7 +701,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { if (isMessageListenerSet()) { - preApplicationProcessing(jmsMessage); + preDeliver(jmsMessage); getMessageListener().onMessage(jmsMessage); postDeliver(jmsMessage); } @@ -758,19 +725,28 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } } - void preDeliver(AbstractJMSMessage msg) + protected void preDeliver(AbstractJMSMessage msg) { + _session.setInRecovery(false); + switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: _session.acknowledgeMessage(msg.getDeliveryTag(), false); break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + _session.addUnacknowledgedMessage(msg.getDeliveryTag()); + _session.markDirty(); + } break; case Session.SESSION_TRANSACTED: if (isNoConsume()) @@ -792,15 +768,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { switch (_acknowledgeMode) { - - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - _session.markDirty(); - break; - case Session.DUPS_OK_ACKNOWLEDGE: case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 47da59724c..d3494298d3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -203,9 +203,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer _causeOfFailure = new AtomicReference(null); - - @Override - protected void setUp() throws Exception - { - super.setUp(); - NUM_MESSAGES = 10; - - _queue = getTestQueue(); - - //Create Producer put some messages on the queue - _connection = getConnection(); - } - - /** - * Initialise the test variables - * @param transacted is this a transacted test - * @param mode if not trasacted then what ack mode to use - * @throws Exception if there is a setup issue. - */ - protected void init(boolean transacted, int mode) throws Exception - { - _consumerSession = _connection.createSession(transacted, mode); - _consumer = _consumerSession.createConsumer(_queue); - _producer = _consumerSession.createProducer(_queue); - - // These should all end up being prefetched by session - sendMessage(_consumerSession, _queue, 1); - - assertEquals("Wrong number of messages on queue", 1, - ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); - } - - /** - * If a transacted session has failed over whilst it has uncommitted sent - * data then we need to throw a TransactedRolledbackException on commit() - * - * The alternative would be to maintain a replay buffer so that the message - * could be resent. This is not currently implemented - * - * @throws Exception if something goes wrong. - */ - public void testDirtySendingSynchronousTransacted() throws Exception - { - Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); - - // Ensure we get failover notifications - ((AMQConnection) _connection).setConnectionListener(this); - - MessageProducer producer = producerSession.createProducer(_queue); - - // Create and send message 0 - Message msg = producerSession.createMessage(); - msg.setIntProperty(INDEX, 0); - producer.send(msg); - - // DON'T commit message .. fail connection - - failBroker(getFailingPort()); - - // Ensure destination exists for sending - producerSession.createConsumer(_queue).close(); - - // Send the next message - msg.setIntProperty(INDEX, 1); - try - { - producer.send(msg); - fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); - } - catch (JMSException jmse) - { - assertEquals("Early warning of dirty session not correct", - "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); - } - - // Ignore that the session is dirty and attempt to commit to validate the - // exception is thrown. AND that the above failure notification did NOT - // clean up the session. - - try - { - producerSession.commit(); - fail("Session is dirty we should get an TransactionRolledBackException"); - } - catch (TransactionRolledBackException trbe) - { - // Normal path. - } - - // Resending of messages should now work ok as the commit was forcilbly rolledback - msg.setIntProperty(INDEX, 0); - producer.send(msg); - msg.setIntProperty(INDEX, 1); - producer.send(msg); - - producerSession.commit(); - - assertEquals("Wrong number of messages on queue", 2, - ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue)); - } - - /** - * If a transacted session has failed over whilst it has uncommitted sent - * data then we need to throw a TransactedRolledbackException on commit() - * - * The alternative would be to maintain a replay buffer so that the message - * could be resent. This is not currently implemented - * - * @throws Exception if something goes wrong. - */ - public void testDirtySendingOnMessageTransacted() throws Exception - { - NUM_MESSAGES = 1; - _receviedAll = new CountDownLatch(NUM_MESSAGES); - ((AMQConnection) _connection).setConnectionListener(this); - - init(true, Session.SESSION_TRANSACTED); - - _consumer.setMessageListener(new MessageListener() - { - - public void onMessage(Message message) - { - try - { - // Create and send message 0 - Message msg = _consumerSession.createMessage(); - msg.setIntProperty(INDEX, 0); - _producer.send(msg); - - // DON'T commit message .. fail connection - - failBroker(getFailingPort()); - - // rep - repopulateBroker(); - - // Destination will exist as this failBroker will populate - // the queue with 1 message - - // Send the next message - msg.setIntProperty(INDEX, 1); - try - { - _producer.send(msg); - fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); - } - catch (JMSException jmse) - { - assertEquals("Early warning of dirty session not correct", - "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); - } - - // Ignore that the session is dirty and attempt to commit to validate the - // exception is thrown. AND that the above failure notification did NOT - // clean up the session. - - try - { - _consumerSession.commit(); - fail("Session is dirty we should get an TransactionRolledBackException"); - } - catch (TransactionRolledBackException trbe) - { - // Normal path. - } - - // Resend messages - msg.setIntProperty(INDEX, 0); - msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT); - _producer.send(msg); - msg.setIntProperty(INDEX, 1); - msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT); - _producer.send(msg); - - _consumerSession.commit(); - - // Stop this consumer .. can't do _consumer.stop == DEADLOCK - // this doesn't seem to stop dispatcher running - _connection.stop(); - - // Signal that the onMessage send part of test is complete - // main thread can validate that messages are correct - _receviedAll.countDown(); - - } - catch (Exception e) - { - fail(e); - } - - } - - }); - - _connection.start(); - - if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS)) - { - // Check to see if we ended due to an exception in the onMessage handler - Exception cause = _causeOfFailure.get(); - if (cause != null) - { - cause.printStackTrace(); - fail(cause.getMessage()); - } - else - { - fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES); - } - } - - // Check to see if we ended due to an exception in the onMessage handler - Exception cause = _causeOfFailure.get(); - if (cause != null) - { - cause.printStackTrace(); - fail(cause.getMessage()); - } - - _consumer.close(); - _consumerSession.close(); - - _consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _connection.start(); - - // Validate that we could send the messages as expected. - assertEquals("Wrong number of messages on queue", 3, - ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); - - MessageConsumer consumer = _consumerSession.createConsumer(_queue); - - //Validate the message sent to setup the failed over broker. - Message message = consumer.receive(1000); - assertNotNull("Message " + 0 + " not received.", message); - assertEquals("Incorrect message received", 0, message.getIntProperty(INDEX)); - - // Validate the two messages sent from within the onMessage - for (int index = 0; index <= 1; index++) - { - message = consumer.receive(1000); - assertNotNull("Message " + index + " not received.", message); - assertEquals("Incorrect message received", index, message.getIntProperty(INDEX)); - assertEquals("Incorrect message text for message:" + index, SEND_FROM_ON_MESSAGE_TEXT, message.getStringProperty(MSG)); - } - - assertNull("Extra message received.", consumer.receiveNoWait()); - - _consumerSession.close(); - - assertEquals("Wrong number of messages on queue", 0, - ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue)); - } - - private void repopulateBroker() throws Exception - { - // Repopulate this new broker so we can test what happends after failover - - //Get the connection to the first (main port) broker. - Connection connection = getConnection(); - // Use a transaction to send messages so we can be sure they arrive. - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - // ensure destination is created. - session.createConsumer(_queue).close(); - - sendMessage(session, _queue, NUM_MESSAGES); - - assertEquals("Wrong number of messages on queue", NUM_MESSAGES, - ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); - - connection.close(); - } - - // AMQConnectionListener Interface.. used so we can validate that we - // actually failed over. - - public void bytesSent(long count) - { - } - - public void bytesReceived(long count) - { - } - - public boolean preFailover(boolean redirect) - { - //Allow failover - return true; - } - - public boolean preResubscribe() - { - //Allow failover - return true; - } - - public void failoverComplete() - { - _failoverCompleted.countDown(); - } - - /** - * Override so we can block until failover has completd - * - * @param port int the port of the broker to fail. - */ - @Override - public void failBroker(int port) - { - super.failBroker(port); - - try - { - if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) - { - fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); - } - } - catch (InterruptedException e) - { - fail("Failover was interuppted"); - } - } - - /** - * Pass the given exception back to the waiting thread to fail the test run. - * - * @param e The exception that is causing the test to fail. - */ - protected void fail(Exception e) - { - _causeOfFailure.set(e); - // End the test. - while (_receviedAll.getCount() != 0) - { - _receviedAll.countDown(); - } - } -} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 2b58a0684d..d05f42e4b7 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -142,7 +142,6 @@ org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#* // These are recent test additions that are failing with the c++ broker // Temporarily disabling until properly investigated. -org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#* org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#* org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#* diff --git a/qpid/java/test-profiles/CPPTransientExcludes b/qpid/java/test-profiles/CPPTransientExcludes index 47f24db19c..a214cf5b5c 100644 --- a/qpid/java/test-profiles/CPPTransientExcludes +++ b/qpid/java/test-profiles/CPPTransientExcludes @@ -27,3 +27,6 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testRecover org.apache.qpid.test.unit.xa.QueueTest#testRecover org.apache.qpid.test.unit.xa.QueueTest#testSendAndRecover + +// test requires a persistent store +org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index fe0a53bdfc..e7718b982d 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -55,9 +55,6 @@ org.apache.qpid.server.queue.ProducerFlowControlTest#* //QPID-1950 : Commit to test this failure. This is a MINA only failure so it cannot be tested when using 010. org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#* -//QPID-3421: tests are failing on 0.10 test profile -org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#* - //QPID-1864: rollback with subscriptions does not work in 0-10 yet org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index 2ce514461a..67190a6fcc 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -19,6 +19,7 @@ //These tests require a persistent store org.apache.qpid.server.store.PersistentStoreTest#* +org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod org.apache.qpid.test.unit.ct.DurableSubscriberTest#* -- cgit v1.2.1