summaryrefslogtreecommitdiff
path: root/java/systests
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-06 16:29:27 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-06 16:29:27 +0000
commit44ac944e1747881a74061ee0221535a03122be69 (patch)
tree55391f2a7edeec40f45dd5bf28464e8dae9b02bb /java/systests
parent5cc8d8556f3572c2bf43e5381354e1d8dafc1a51 (diff)
downloadqpid-python-44ac944e1747881a74061ee0221535a03122be69.tar.gz
QPID-3526, QPID-3524: make sure the 0-10 client message.acknowledge() actually acknowledges messages immediately, and does so synchronously, adding test to verify behaviour. Split acknowledge() and commit() methods into version specific session implementations for clarity/reuse, align 0-10 and 0-8/9 transacted publishing behaviour, refactor preDeliver and postDeliver methods, remove dead code from consumers.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179695 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java82
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java403
3 files changed, 83 insertions, 404 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
index 1a23eee8ab..6189c37306 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
@@ -63,7 +63,7 @@ public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest
catch (JMSException e)
{
assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
- assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit"));
+ assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Exception during commit"));
// As we are using Nano time ensure to multiply up the millis.
assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
new file mode 100644
index 0000000000..06be5cf456
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.test.unit.ack;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ClientAcknowledgeTest extends QpidBrokerTestCase
+{
+ private static final long ONE_DAY_MS = 1000l * 60 * 60 * 24;
+ private Connection _connection;
+ private Queue _queue;
+ private Session _consumerSession;
+ private MessageConsumer _consumer;
+ private MessageProducer _producer;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _queue = getTestQueue();
+ _connection = getConnection();
+ }
+
+ /**
+ * Test that message.acknowledge actually acknowledges, regardless of
+ * the flusher thread period, by restarting the broker after calling
+ * acknowledge, and then verifying after restart that the message acked
+ * is no longer present. This test requires a persistent store.
+ */
+ public void testClientAckWithLargeFlusherPeriod() throws Exception
+ {
+ setTestClientSystemProperty("qpid.session.max_ack_delay", Long.toString(ONE_DAY_MS));
+ _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumer = _consumerSession.createConsumer(_queue);
+ _connection.start();
+
+ _producer = _consumerSession.createProducer(_queue);
+ _producer.send(createNextMessage(_consumerSession, 1));
+ _producer.send(createNextMessage(_consumerSession, 2));
+
+ Message message = _consumer.receive(1000l);
+ assertNotNull("Message has not been received", message);
+ assertEquals("Unexpected message is received", 1, message.getIntProperty(INDEX));
+ message.acknowledge();
+
+ //restart broker to allow verification of the acks
+ //without explicitly closing connection (which acks)
+ restartBroker();
+
+ // try to receive the message again, which should fail (as it was ackd)
+ _connection = getConnection();
+ _connection.start();
+ _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumer = _consumerSession.createConsumer(_queue);
+ message = _consumer.receive(1000l);
+ assertNotNull("Message has not been received", message);
+ assertEquals("Unexpected message is received", 2, message.getIntProperty(INDEX));
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java
deleted file mode 100644
index 3ec7937812..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.publish;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TransactionRolledBackException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * QPID-1816 : Whilst testing Acknoledgement after failover this completes testing
- * of the client after failover. When we have a dirty session we should receive
- * an error if we attempt to publish. This test ensures that both in the synchronous
- * and asynchronous message delivery paths we receive the expected exceptions at
- * the expected time.
- */
-public class DirtyTransactedPublishTest extends FailoverBaseCase implements ConnectionListener
-{
- protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
-
- protected int NUM_MESSAGES;
- protected Connection _connection;
- protected Queue _queue;
- protected Session _consumerSession;
- protected MessageConsumer _consumer;
- protected MessageProducer _producer;
-
- private static final String MSG = "MSG";
- private static final String SEND_FROM_ON_MESSAGE_TEXT = "sendFromOnMessage";
- protected CountDownLatch _receviedAll;
- protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- NUM_MESSAGES = 10;
-
- _queue = getTestQueue();
-
- //Create Producer put some messages on the queue
- _connection = getConnection();
- }
-
- /**
- * Initialise the test variables
- * @param transacted is this a transacted test
- * @param mode if not trasacted then what ack mode to use
- * @throws Exception if there is a setup issue.
- */
- protected void init(boolean transacted, int mode) throws Exception
- {
- _consumerSession = _connection.createSession(transacted, mode);
- _consumer = _consumerSession.createConsumer(_queue);
- _producer = _consumerSession.createProducer(_queue);
-
- // These should all end up being prefetched by session
- sendMessage(_consumerSession, _queue, 1);
-
- assertEquals("Wrong number of messages on queue", 1,
- ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
- }
-
- /**
- * If a transacted session has failed over whilst it has uncommitted sent
- * data then we need to throw a TransactedRolledbackException on commit()
- *
- * The alternative would be to maintain a replay buffer so that the message
- * could be resent. This is not currently implemented
- *
- * @throws Exception if something goes wrong.
- */
- public void testDirtySendingSynchronousTransacted() throws Exception
- {
- Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
-
- // Ensure we get failover notifications
- ((AMQConnection) _connection).setConnectionListener(this);
-
- MessageProducer producer = producerSession.createProducer(_queue);
-
- // Create and send message 0
- Message msg = producerSession.createMessage();
- msg.setIntProperty(INDEX, 0);
- producer.send(msg);
-
- // DON'T commit message .. fail connection
-
- failBroker(getFailingPort());
-
- // Ensure destination exists for sending
- producerSession.createConsumer(_queue).close();
-
- // Send the next message
- msg.setIntProperty(INDEX, 1);
- try
- {
- producer.send(msg);
- fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
- }
- catch (JMSException jmse)
- {
- assertEquals("Early warning of dirty session not correct",
- "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
- }
-
- // Ignore that the session is dirty and attempt to commit to validate the
- // exception is thrown. AND that the above failure notification did NOT
- // clean up the session.
-
- try
- {
- producerSession.commit();
- fail("Session is dirty we should get an TransactionRolledBackException");
- }
- catch (TransactionRolledBackException trbe)
- {
- // Normal path.
- }
-
- // Resending of messages should now work ok as the commit was forcilbly rolledback
- msg.setIntProperty(INDEX, 0);
- producer.send(msg);
- msg.setIntProperty(INDEX, 1);
- producer.send(msg);
-
- producerSession.commit();
-
- assertEquals("Wrong number of messages on queue", 2,
- ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue));
- }
-
- /**
- * If a transacted session has failed over whilst it has uncommitted sent
- * data then we need to throw a TransactedRolledbackException on commit()
- *
- * The alternative would be to maintain a replay buffer so that the message
- * could be resent. This is not currently implemented
- *
- * @throws Exception if something goes wrong.
- */
- public void testDirtySendingOnMessageTransacted() throws Exception
- {
- NUM_MESSAGES = 1;
- _receviedAll = new CountDownLatch(NUM_MESSAGES);
- ((AMQConnection) _connection).setConnectionListener(this);
-
- init(true, Session.SESSION_TRANSACTED);
-
- _consumer.setMessageListener(new MessageListener()
- {
-
- public void onMessage(Message message)
- {
- try
- {
- // Create and send message 0
- Message msg = _consumerSession.createMessage();
- msg.setIntProperty(INDEX, 0);
- _producer.send(msg);
-
- // DON'T commit message .. fail connection
-
- failBroker(getFailingPort());
-
- // rep
- repopulateBroker();
-
- // Destination will exist as this failBroker will populate
- // the queue with 1 message
-
- // Send the next message
- msg.setIntProperty(INDEX, 1);
- try
- {
- _producer.send(msg);
- fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
- }
- catch (JMSException jmse)
- {
- assertEquals("Early warning of dirty session not correct",
- "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
- }
-
- // Ignore that the session is dirty and attempt to commit to validate the
- // exception is thrown. AND that the above failure notification did NOT
- // clean up the session.
-
- try
- {
- _consumerSession.commit();
- fail("Session is dirty we should get an TransactionRolledBackException");
- }
- catch (TransactionRolledBackException trbe)
- {
- // Normal path.
- }
-
- // Resend messages
- msg.setIntProperty(INDEX, 0);
- msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT);
- _producer.send(msg);
- msg.setIntProperty(INDEX, 1);
- msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT);
- _producer.send(msg);
-
- _consumerSession.commit();
-
- // Stop this consumer .. can't do _consumer.stop == DEADLOCK
- // this doesn't seem to stop dispatcher running
- _connection.stop();
-
- // Signal that the onMessage send part of test is complete
- // main thread can validate that messages are correct
- _receviedAll.countDown();
-
- }
- catch (Exception e)
- {
- fail(e);
- }
-
- }
-
- });
-
- _connection.start();
-
- if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS))
- {
- // Check to see if we ended due to an exception in the onMessage handler
- Exception cause = _causeOfFailure.get();
- if (cause != null)
- {
- cause.printStackTrace();
- fail(cause.getMessage());
- }
- else
- {
- fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
- }
- }
-
- // Check to see if we ended due to an exception in the onMessage handler
- Exception cause = _causeOfFailure.get();
- if (cause != null)
- {
- cause.printStackTrace();
- fail(cause.getMessage());
- }
-
- _consumer.close();
- _consumerSession.close();
-
- _consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _connection.start();
-
- // Validate that we could send the messages as expected.
- assertEquals("Wrong number of messages on queue", 3,
- ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
-
- MessageConsumer consumer = _consumerSession.createConsumer(_queue);
-
- //Validate the message sent to setup the failed over broker.
- Message message = consumer.receive(1000);
- assertNotNull("Message " + 0 + " not received.", message);
- assertEquals("Incorrect message received", 0, message.getIntProperty(INDEX));
-
- // Validate the two messages sent from within the onMessage
- for (int index = 0; index <= 1; index++)
- {
- message = consumer.receive(1000);
- assertNotNull("Message " + index + " not received.", message);
- assertEquals("Incorrect message received", index, message.getIntProperty(INDEX));
- assertEquals("Incorrect message text for message:" + index, SEND_FROM_ON_MESSAGE_TEXT, message.getStringProperty(MSG));
- }
-
- assertNull("Extra message received.", consumer.receiveNoWait());
-
- _consumerSession.close();
-
- assertEquals("Wrong number of messages on queue", 0,
- ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
- }
-
- private void repopulateBroker() throws Exception
- {
- // Repopulate this new broker so we can test what happends after failover
-
- //Get the connection to the first (main port) broker.
- Connection connection = getConnection();
- // Use a transaction to send messages so we can be sure they arrive.
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- // ensure destination is created.
- session.createConsumer(_queue).close();
-
- sendMessage(session, _queue, NUM_MESSAGES);
-
- assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
- ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
-
- connection.close();
- }
-
- // AMQConnectionListener Interface.. used so we can validate that we
- // actually failed over.
-
- public void bytesSent(long count)
- {
- }
-
- public void bytesReceived(long count)
- {
- }
-
- public boolean preFailover(boolean redirect)
- {
- //Allow failover
- return true;
- }
-
- public boolean preResubscribe()
- {
- //Allow failover
- return true;
- }
-
- public void failoverComplete()
- {
- _failoverCompleted.countDown();
- }
-
- /**
- * Override so we can block until failover has completd
- *
- * @param port int the port of the broker to fail.
- */
- @Override
- public void failBroker(int port)
- {
- super.failBroker(port);
-
- try
- {
- if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
- {
- fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
- }
- }
- catch (InterruptedException e)
- {
- fail("Failover was interuppted");
- }
- }
-
- /**
- * Pass the given exception back to the waiting thread to fail the test run.
- *
- * @param e The exception that is causing the test to fail.
- */
- protected void fail(Exception e)
- {
- _causeOfFailure.set(e);
- // End the test.
- while (_receviedAll.getCount() != 0)
- {
- _receviedAll.countDown();
- }
- }
-}