diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-04 12:15:18 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-04 12:15:18 +0000 |
| commit | 9bde51bb9e454cf54a71ae4b3f74ced95b68f5ae (patch) | |
| tree | 9ab7fdb74d2521ba09d7b0ec8d56d6c4fe599467 | |
| parent | 54fb883b977ee1bfac5977597332dbe4602e05ae (diff) | |
| download | qpid-python-9bde51bb9e454cf54a71ae4b3f74ced95b68f5ae.tar.gz | |
NO-JIRA: Remove dead testcase. Much of the original intent seems to be lost.
What remains duplicates AsyncMessageListenerTest, SyncReceiveTest and RollbackOrderTest.
Removed associated support class too.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1657134 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 0 insertions, 659 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java deleted file mode 100644 index 5895d670a7..0000000000 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ /dev/null @@ -1,371 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.test.unit.close; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.QpidClientConnection; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; -import java.util.concurrent.atomic.AtomicInteger; - -public class MessageRequeueTest extends QpidBrokerTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class); - - protected static AtomicInteger consumerIds = new AtomicInteger(0); - protected final Integer numTestMessages = 150; - - protected final int consumeTimeout = 3000; - - protected final String queue = "direct://amq.direct//message-requeue-test-queue"; - protected String payload = "Message:"; - - protected final String BROKER = "tcp://127.0.0.1:5672"; - private boolean testReception = true; - - private long[] receieved = new long[numTestMessages + 1]; - private boolean passed = false; - private QpidClientConnection conn; - - - protected void setUp() throws Exception - { - super.setUp(); - - conn = new QpidClientConnection(BROKER); - - conn.connect(); - // clear queue - conn.consume(queue, consumeTimeout); - // load test data - _logger.info("creating test data, " + numTestMessages + " messages"); - conn.put(queue, payload, numTestMessages); - // close this connection - conn.disconnect(); - } - - protected void tearDown() throws Exception - { - - if (!passed) // clean up - { - QpidClientConnection conn = new QpidClientConnection(BROKER); - - conn.connect(); - // clear queue - conn.consume(queue, consumeTimeout); - - conn.disconnect(); - } - - super.tearDown(); - } - - /** - * multiple consumers - * - * @throws javax.jms.JMSException if a JMS problem occurs - * @throws InterruptedException on timeout - */ - public void testDrain() throws Exception - { - QpidClientConnection conn = new QpidClientConnection(BROKER); - - conn.connect(); - - _logger.info("consuming queue " + queue); - Queue q = conn.getSession().createQueue(queue); - - final MessageConsumer consumer = conn.getSession().createConsumer(q); - int messagesReceived = 0; - - long[] messageLog = new long[numTestMessages + 1]; - - _logger.info("consuming..."); - Message msg = consumer.receive(1000); - while (msg != null) - { - messagesReceived++; - - long dt = ((AbstractJMSMessage) msg).getDeliveryTag(); - - int msgindex = msg.getIntProperty("index"); - if (messageLog[msgindex] != 0) - { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() - + ") more than once."); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex); - } - - if (dt == 0) - { - _logger.error("DT is zero for msg:" + msgindex); - } - - messageLog[msgindex] = dt; - - // get Next message - msg = consumer.receive(1000); - } - - _logger.info("consuming done."); - conn.getSession().commit(); - consumer.close(); - - int index = 0; - StringBuilder list = new StringBuilder(); - list.append("Failed to receive:"); - int failed = 0; - - _logger.info("consumed: " + messagesReceived); - - assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); - // with 0_10 we can have a delivery tag of 0 - if (!conn.isBroker010()) - { - for (long b : messageLog) - { - if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist - { - _logger.error("Index: " + index + " was not received."); - list.append(" "); - list.append(index); - list.append(":"); - list.append(b); - failed++; - } - - index++; - } - - assertEquals(list.toString(), 0, failed); - } - - conn.disconnect(); - passed = true; - } - - /** multiple consumers - * Based on code subbmitted by client FT-304 - */ - public void testTwoCompetingConsumers() - { - Consumer c1 = new Consumer(); - Consumer c2 = new Consumer(); - Consumer c3 = new Consumer(); - Consumer c4 = new Consumer(); - - Thread t1 = new Thread(c1); - Thread t2 = new Thread(c2); - Thread t3 = new Thread(c3); - Thread t4 = new Thread(c4); - - t1.start(); - t2.start(); - t3.start(); - // t4.start(); - - try - { - t1.join(); - t2.join(); - t3.join(); - t4.join(); - } - catch (InterruptedException e) - { - fail("Unable to join to Consumer theads"); - } - - _logger.info("consumer 1 count is " + c1.getCount()); - _logger.info("consumer 2 count is " + c2.getCount()); - _logger.info("consumer 3 count is " + c3.getCount()); - _logger.info("consumer 4 count is " + c4.getCount()); - - Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount(); - - // Check all messages were correctly delivered - int index = 0; - StringBuilder list = new StringBuilder(); - list.append("Failed to receive:"); - int failed = 0; - if (!conn.isBroker010()) - { - for (long b : receieved) - { - if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0) - { - _logger.error("Index: " + index + " was not received."); - list.append(" "); - list.append(index); - list.append(":"); - list.append(b); - failed++; - } - - index++; - } - - assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); - } - assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); - passed = true; - } - - class Consumer implements Runnable - { - private Integer count = 0; - private Integer id; - - public Consumer() - { - id = consumerIds.addAndGet(1); - } - - public void run() - { - try - { - _logger.info("consumer-" + id + ": starting"); - QpidClientConnection conn = new QpidClientConnection(BROKER); - - conn.connect(); - - _logger.info("consumer-" + id + ": connected, consuming..."); - Message result; - do - { - result = conn.getNextMessage(queue, consumeTimeout); - if (result != null) - { - - long dt = ((AbstractJMSMessage) result).getDeliveryTag(); - - if (testReception) - { - int msgindex = result.getIntProperty("index"); - if (receieved[msgindex] != 0) - { - _logger.error("Received Message(" + msgindex + ":" - + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once."); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt - + "IN:" + msgindex); - } - - if (dt == 0) - { - _logger.error("DT is zero for msg:" + msgindex); - } - - receieved[msgindex] = dt; - } - - count++; - if ((count % 100) == 0) - { - _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); - } - } - } - while (result != null); - - _logger.info("consumer-" + id + ": complete"); - conn.disconnect(); - - } - catch (Exception e) - { - _logger.error("Consumer run error",e); - } - } - - public Integer getCount() - { - return count; - } - - public Integer getId() - { - return id; - } - } - - public void testRequeue() throws JMSException, AMQException, URLSyntaxException - { - int run = 0; - // while (run < 10) - { - run++; - - if (_logger.isInfoEnabled()) - { - _logger.info("testRequeue run " + run); - } - - String virtualHost = "/test"; - String brokerlist = BROKER; - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - QpidClientConnection qpc = new QpidClientConnection(BROKER); - qpc.connect(); - Connection conn = qpc. getConnection(); - - Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue q = session.createQueue(queue); - - _logger.debug("Create Consumer"); - MessageConsumer consumer = session.createConsumer(q); - - conn.start(); - - _logger.debug("Receiving msg"); - Message msg = consumer.receive(2000); - - assertNotNull("Message should not be null", msg); - - // As we have not ack'd message will be requeued. - _logger.debug("Close Consumer"); - consumer.close(); - - _logger.debug("Close Connection"); - conn.close(); - } - } - -} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java deleted file mode 100644 index 0e0032da64..0000000000 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.test.utils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.JMSAMQException; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -public class QpidClientConnection extends QpidBrokerTestCase implements ExceptionListener -{ - private static final Logger _logger = LoggerFactory.getLogger(QpidClientConnection.class); - - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnection(String broker) - { - super(); - setVirtualHost("/test"); - setBrokerList(broker); - setPrefetch(5000); - } - - - public Connection getConnection() - { - return connection; - } - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - _logger.info("connecting to Qpid :" + brokerUrl); - connection = getConnection("guest", "guest") ; - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (Exception e) - { - throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); - } - } - } - - public void disconnect() throws Exception - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() - { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - m.setIntProperty("index", i + 1); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) - { - connect(); - } - - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - Message result; - - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message); - } - else if (null == message) - { - result = null; - } - else - { - _logger.info("warning: received non-text message"); - result = message; - } - - return result; - } - - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public Message getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } - - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) - { - connect(); - } - - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; - - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - - session.commit(); - consumer.close(); - _logger.info("consumed: " + messagesReceived); - } -} |
