summaryrefslogtreecommitdiff
path: root/qpid/java/systests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests')
-rw-r--r--qpid/java/systests/pom.xml9
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java11
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java (renamed from qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java)559
3 files changed, 290 insertions, 289 deletions
diff --git a/qpid/java/systests/pom.xml b/qpid/java/systests/pom.xml
index d9d07ed6f9..8a245b73a9 100644
--- a/qpid/java/systests/pom.xml
+++ b/qpid/java/systests/pom.xml
@@ -54,6 +54,15 @@
<artifactId>junit</artifactId>
<scope>compile</scope>
</dependency>
+
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
index 0da18ea87f..976f58ec3d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
@@ -1,13 +1,14 @@
package org.apache.qpid.server.failure;
import junit.framework.TestCase;
-import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.testutil.QpidClientConnectionHelper;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.log4j.Logger;
import javax.jms.JMSException;
+import javax.jms.DeliveryMode;
import java.io.IOException;
@@ -16,7 +17,7 @@ public class HeapExhaustion extends TestCase
{
private static final Logger _logger = Logger.getLogger(HeapExhaustion.class);
- protected QpidClientConnection conn;
+ protected QpidClientConnectionHelper conn;
protected final String BROKER = "localhost";
protected final String vhost = "/test";
protected final String queue = "direct://amq.direct//queue";
@@ -31,7 +32,7 @@ public class HeapExhaustion extends TestCase
protected void setUp() throws Exception
{
- conn = new QpidClientConnection(BROKER);
+ conn = new QpidClientConnectionHelper(BROKER);
conn.setVirtualHost(vhost);
try
@@ -68,7 +69,7 @@ public class HeapExhaustion extends TestCase
int size = payload.getBytes().length;
while (true)
{
- conn.put(queue, payload, 1);
+ conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
copies++;
total += size;
System.out.println("put copy " + copies + " OK for total bytes: " + total);
@@ -88,7 +89,7 @@ public class HeapExhaustion extends TestCase
int size = payload.getBytes().length;
while (true)
{
- conn.put(queue, payload, 1);
+ conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
copies++;
total += size;
System.out.println("put copy " + copies + " OK for total bytes: " + total);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java
index 6f955ebdab..fcc3adee13 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java
@@ -1,284 +1,275 @@
-package org.apache.qpid.testutil;
-
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.JMSAMQException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
-
-import javax.jms.ExceptionListener;
-import javax.jms.Session;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.TextMessage;
-
-public class QpidClientConnection implements ExceptionListener
-{
-
- private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
-
- private boolean transacted = true;
- private int ackMode = Session.CLIENT_ACKNOWLEDGE;
- private Connection connection;
-
- private String virtualHost;
- private String brokerlist;
- private int prefetch;
- protected Session session;
- protected boolean connected;
-
- public QpidClientConnection(String broker)
- {
- super();
- setVirtualHost("/test");
- setBrokerList(broker);
- setPrefetch(5000);
- }
-
-
- public void connect()
- throws
- JMSException
- {
- if (!connected)
- {
- /*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- try
- {
- AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
- _logger.info("connecting to Qpid :" + brokerUrl);
- connection = factory.createConnection();
-
- // register exception listener
- connection.setExceptionListener(this);
-
- session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
-
- _logger.info("starting connection");
- connection.start();
-
- connected = true;
- }
- catch (URLSyntaxException e)
- {
- throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
- }
- }
- }
-
- public void disconnect()
- throws
- JMSException
- {
- if (connected)
- {
- session.commit();
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected");
- }
- }
-
- public void disconnectWithoutCommit()
- throws
- JMSException
- {
- if (connected)
- {
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected without commit");
- }
- }
-
- public String getBrokerList()
- {
- return brokerlist;
- }
-
- public void setBrokerList(String brokerlist)
- {
- this.brokerlist = brokerlist;
- }
-
- public String getVirtualHost()
- {
- return virtualHost;
- }
-
- public void setVirtualHost(String virtualHost)
- {
- this.virtualHost = virtualHost;
- }
-
- public void setPrefetch(int prefetch)
- {
- this.prefetch = prefetch;
- }
-
-
- /**
- * override as necessary
- */
- public void onException(JMSException exception)
- {
- _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- /**
- * Put a String as a text messages, repeat n times. A null payload will result in a null message.
- *
- * @param queueName The queue name to put to
- * @param payload the content of the payload
- * @param copies the number of messages to put
- * @throws javax.jms.JMSException any exception that occurs
- */
- public void put(String queueName, String payload, int copies)
- throws
- JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("putting to queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageProducer sender = session.createProducer(queue);
-
- for (int i = 0; i < copies; i++)
- {
- Message m = session.createTextMessage(payload + i);
- m.setIntProperty("index", i + 1);
- sender.send(m);
- }
-
- try
- {
- session.commit();
- } catch (JMSException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- sender.close();
- _logger.info("put " + copies + " copies");
- }
-
- /**
- * GET the top message on a queue. Consumes the message. Accepts timeout value.
- *
- * @param queueName The quename to get from
- * @param readTimeout The timeout to use
- * @return the content of the text message if any
- * @throws javax.jms.JMSException any exception that occured
- */
- public Message getNextMessage(String queueName, long readTimeout)
- throws
- JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
-
- Message message = consumer.receive(readTimeout);
- session.commit();
- consumer.close();
-
- Message result;
-
- // all messages we consume should be TextMessages
- if (message instanceof TextMessage)
- {
- result = ((TextMessage) message);
- } else if (null == message)
- {
- result = null;
- } else
- {
- _logger.info("warning: received non-text message");
- result = message;
- }
-
- return result;
- }
-
- /**
- * GET the top message on a queue. Consumes the message.
- *
- * @param queueName The Queuename to get from
- * @return The string content of the text message, if any received
- * @throws javax.jms.JMSException any exception that occurs
- */
- public Message getNextMessage(String queueName)
- throws
- JMSException
- {
- return getNextMessage(queueName, 0);
- }
-
- /**
- * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
- *
- * @param queueName The Queue name to consume from
- * @param readTimeout The timeout for each consume
- * @throws javax.jms.JMSException Any exception that occurs during the consume
- * @throws InterruptedException If the consume thread was interrupted during a consume.
- */
- public void consume(String queueName, int readTimeout)
- throws
- JMSException,
- InterruptedException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("consuming queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
- int messagesReceived = 0;
-
- _logger.info("consuming...");
- while ((consumer.receive(readTimeout)) != null)
- {
- messagesReceived++;
- }
-
- session.commit();
- consumer.close();
-
- _logger.info("consumed: " + messagesReceived);
- }
-}
+package org.apache.qpid.testutil;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * @todo This was originally cut and paste from the client module leading to a duplicate class, then altered very
+ * slightly. To avoid the duplicate class the name was altered slightly to have 'Helper' on the end in order
+ * to distinguish it from the original. Delete this class and use the original instead, just upgrade it to
+ * provide the new features needed.
+ */
+public class QpidClientConnectionHelper implements ExceptionListener
+{
+
+ private static final Logger _logger = Logger.getLogger(QpidClientConnectionHelper.class);
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnectionHelper(String broker)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(broker);
+ setPrefetch(5000);
+ }
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ sender.setDeliveryMode(deliveryMode);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+}