summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-11-09 10:26:25 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-11-09 10:26:25 +0000
commit86996d5680a07acd82ffda2829dcdd6d6585e606 (patch)
tree9d59a45afa1989dd313e69972e080d85f712071d
parent793a6b31f7ad6b27f0683eef52ae92e05d7f8fee (diff)
downloadqpid-python-86996d5680a07acd82ffda2829dcdd6d6585e606.tar.gz
Updated perftests to use JMS/JNDI lookup. Added some options to support non-Qpid middleware.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@593481 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/perftests/etc/jndi/activemq.properties3
-rw-r--r--java/perftests/etc/jndi/perftests.properties2
-rw-r--r--java/perftests/etc/jndi/swiftmq.properties3
-rw-r--r--java/perftests/etc/scripts/Test-ActiveMQ.sh14
-rw-r--r--java/perftests/etc/scripts/Test-SwiftMQ.sh12
-rw-r--r--java/perftests/jar-with-dependencies.xml43
-rw-r--r--java/perftests/pom.xml4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java162
8 files changed, 159 insertions, 84 deletions
diff --git a/java/perftests/etc/jndi/activemq.properties b/java/perftests/etc/jndi/activemq.properties
new file mode 100644
index 0000000000..0616bee2e0
--- /dev/null
+++ b/java/perftests/etc/jndi/activemq.properties
@@ -0,0 +1,3 @@
+java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
+java.naming.provider.url= tcp://localhost:61616
+
diff --git a/java/perftests/etc/jndi/perftests.properties b/java/perftests/etc/jndi/perftests.properties
new file mode 100644
index 0000000000..6cc16203e9
--- /dev/null
+++ b/java/perftests/etc/jndi/perftests.properties
@@ -0,0 +1,2 @@
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'
diff --git a/java/perftests/etc/jndi/swiftmq.properties b/java/perftests/etc/jndi/swiftmq.properties
new file mode 100644
index 0000000000..5a3b3ea7ec
--- /dev/null
+++ b/java/perftests/etc/jndi/swiftmq.properties
@@ -0,0 +1,3 @@
+java.naming.factory.initial = com.swiftmq.jndi.InitialContextFactoryImpl
+java.naming.provider.url=smqp://localhost:4001/timeout=10000
+#connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'
diff --git a/java/perftests/etc/scripts/Test-ActiveMQ.sh b/java/perftests/etc/scripts/Test-ActiveMQ.sh
new file mode 100644
index 0000000000..925c93ab09
--- /dev/null
+++ b/java/perftests/etc/scripts/Test-ActiveMQ.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+ if [[ $arg == -java:* ]]; then
+ JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
+ else
+ ARGS="${ARGS}$arg "
+ fi
+done
+
+java -Xms256m -Dlog4j.configuration=perftests.log4j -Xmx1024m -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn ${JAVA_OPTS} -cp "qpid-perftests-1.0-incubating-M2.1-SNAPSHOT.jar;activemq-jars/apache-activemq-4.1.1.jar" uk.co.thebadgerset.junit.extensions.TKTestRunner -n Test-ActiveMQ -s[1] -r 1 -t testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf properties=activemq.properties factoryName=ConnectionFactory ${ARGS}
+
+# queueNamePostfix=@router1 overrideClientId=true \ No newline at end of file
diff --git a/java/perftests/etc/scripts/Test-SwiftMQ.sh b/java/perftests/etc/scripts/Test-SwiftMQ.sh
new file mode 100644
index 0000000000..f10c3cdc3f
--- /dev/null
+++ b/java/perftests/etc/scripts/Test-SwiftMQ.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+ if [[ $arg == -java:* ]]; then
+ JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
+ else
+ ARGS="${ARGS}$arg "
+ fi
+done
+
+java -Xms256m -Dlog4j.configuration=perftests.log4j -Xmx1024m -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn ${JAVA_OPTS} -cp "qpid-perftests-1.0-incubating-M2.1-SNAPSHOT.jar;swiftmqjars/swiftmq.jar" uk.co.thebadgerset.junit.extensions.TKTestRunner -n Test-SwiftMQ -s[1] -r 1 -t testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf properties=swiftmq.properties factoryName=ConnectionFactory queueNamePostfix=@router1 overrideClientId=true ${ARGS} \ No newline at end of file
diff --git a/java/perftests/jar-with-dependencies.xml b/java/perftests/jar-with-dependencies.xml
index 5813b6334b..bbbbd3788e 100644
--- a/java/perftests/jar-with-dependencies.xml
+++ b/java/perftests/jar-with-dependencies.xml
@@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+ -
http://www.apache.org/licenses/LICENSE-2.0
-
+ -
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,21 +44,30 @@
<outputDirectory></outputDirectory>
</fileSet>
- <!-- Include all the test scripts, both generated and hand-written. -->
- <fileSet>
- <directory>target</directory>
- <outputDirectory></outputDirectory>
- <includes>
- <include>*.sh</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>etc/scripts</directory>
- <outputDirectory></outputDirectory>
- <includes>
- <include>*.sh</include>
- </includes>
- </fileSet>
+ <!-- Include all the test scripts, both generated and hand-written. -->
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory></outputDirectory>
+ <includes>
+ <include>*.sh</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>etc/scripts</directory>
+ <outputDirectory></outputDirectory>
+ <includes>
+ <include>*.sh</include>
+ </includes>
+ </fileSet>
+
+ <!-- Include all jndi configurations needed to run the tests. -->
+ <fileSet>
+ <directory>etc/jndi</directory>
+ <outputDirectory></outputDirectory>
+ <includes>
+ <include>*.properties</include>
+ </includes>
+ </fileSet>
<!-- Include the build artifact. -->
<fileSet>
diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml
index 0763f95602..077669b6e6 100644
--- a/java/perftests/pom.xml
+++ b/java/perftests/pom.xml
@@ -351,13 +351,13 @@
<TQBT-TX-Qpid-01>-n TQBT-TX-Qpid-01 -d10M -s[1000] -c[16] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-TX-Qpid-01>
<TQBT-AA-Qpid-01>-n TQBT-AA-Qpid-01 -d10M -s[1000] -c[16] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-AA-Qpid-01>
<TQBT-NA-Qpid-01>-n TQBT-NA-Qpid-01 -d10M -s[1000] -c[16] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=257 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-NA-Qpid-01>
- <TTBT-TX-Qpid-01>-n TTBT-TX-Qpid-01 -d10M -s[1000] -c[2] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=8 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TTBT-TX-Qpid-01>
+ <TTBT-TX-Qpid-01>-n TTBT-TX-Qpid-01 -d10M -s[1000] -c[2] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TTBT-TX-Qpid-01>
<TTBT-AA-Qpid-01>-n TTBT-AA-Qpid-01 -d10M -s[1000] -c[2] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TTBT-AA-Qpid-01>
<TTBT-NA-Qpid-01>-n TTBT-NA-Qpid-01 -d10M -s[1000] -c[2] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=8 transacted=false consTransacted=false consAckMode=257 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TTBT-NA-Qpid-01>
<PQBT-TX-Qpid-01>-n PQBT-TX-Qpid-01 -d10M -s[1000] -c[16] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-TX-Qpid-01>
<PQBT-AA-Qpid-01>-n PQBT-AA-Qpid-01 -d10M -s[1000] -c[16] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-AA-Qpid-01>
- <PTBT-TX-Qpid-01>-n PTBT-TX-Qpid-01 -d10M -s[1000] -c[2] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=8 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PTBT-TX-Qpid-01>
+ <PTBT-TX-Qpid-01>-n PTBT-TX-Qpid-01 -d10M -s[1000] -c[2] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PTBT-TX-Qpid-01>
<PTBT-AA-Qpid-01>-n PTBT-AA-Qpid-01 -d10M -s[1000] -c[2] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PTBT-AA-Qpid-01>
<!-- Job Queueing. 1:10 -->
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 99ed9f8367..c3689151d2 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -23,24 +23,19 @@ package org.apache.qpid.requestreply;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.*;
-import org.apache.qpid.client.message.AMQMessage;
-import org.apache.qpid.client.message.TestMessageFactory;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.MessageProducer;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.test.framework.TestUtils;
import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
import uk.co.thebadgerset.junit.extensions.Throttle;
+import uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
-import java.io.IOException;
+import java.io.*;
import java.net.InetAddress;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -126,11 +121,29 @@ import java.util.concurrent.atomic.AtomicLong;
* message waits until all other messages have been handled before releasing producers but allows messages to be
* processed concurrently, unlike the current synchronized block.
*/
-public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener
+public class PingPongProducer implements Runnable, ExceptionListener
{
/** Used for debugging. */
private static final Logger log = Logger.getLogger(PingPongProducer.class);
+ /** Holds the name of the property to determine whether of not client id is overridden at connection time. */
+ public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId";
+
+ /** Holds the default value of the override client id flag. */
+ public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false";
+
+ /** Holds the name of the property to define the JNDI factory name with. */
+ public static final String FACTORY_NAME_PROPNAME = "factoryName";
+
+ /** Holds the default JNDI name of the connection factory. */
+ public static final String FACTORY_NAME_DEAFULT = "local";
+
+ /** Holds the name of the property to set the JNDI initial context properties with. */
+ public static final String FILE_PROPERTIES_PROPNAME = "properties";
+
+ /** Holds the default file name of the JNDI initial context properties. */
+ public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties";
+
/** Holds the name of the property to get the test message size from. */
public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
@@ -143,6 +156,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Holds the name of the default destination to send pings on. */
public static final String PING_QUEUE_NAME_DEFAULT = "ping";
+ /** Holds the name of the property to get the queue name postfix from. */
+ public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix";
+
+ /** Holds the default queue name postfix value. */
+ public static final String QUEUE_NAME_POSTFIX_DEFAULT = "";
+
/** Holds the name of the property to get the test delivery mode from. */
public static final String PERSISTENT_MODE_PROPNAME = "persistent";
@@ -310,11 +329,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
static
{
+ defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT);
+ defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT);
+ defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT);
defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT);
defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
@@ -339,6 +362,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
}
+ /** Allows setting of client ID on the connection, rather than through the connection URL. */
+ protected boolean _overrideClientId;
+
+ /** Holds the JNDI name of the JMS connection factory. */
+ protected String _factoryName;
+
+ /** Holds the name of the properties file to configure JNDI with. */
+ protected String _fileProperties;
+
/** Holds the broker url. */
protected String _brokerDetails;
@@ -354,6 +386,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Holds the root name from which to generate test destination names. */
protected String _destinationName;
+ /** Holds the default queue name postfix value. */
+ protected String _queueNamePostfix;
+
/** Holds the message selector to filter the pings with. */
protected String _selector;
@@ -523,11 +558,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
properties.putAll(overrides);
// Extract the configuration properties to set the pinger up with.
+ _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME);
+ _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME);
+ _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME);
_brokerDetails = properties.getProperty(BROKER_PROPNAME);
_username = properties.getProperty(USERNAME_PROPNAME);
_password = properties.getProperty(PASSWORD_PROPNAME);
_virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
_destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
+ _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME);
_selector = properties.getProperty(SELECTOR_PROPNAME);
_transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
_consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
@@ -582,19 +621,20 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
- _clientID = address.getHostName() + System.currentTimeMillis();
+ // _clientID = address.getHostName() + System.currentTimeMillis();
+ _clientID = "perftest_" + instanceId;
// Create a connection to the broker.
createConnection(_clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) _connection.createSession(_transacted, _ackMode);
+ _producerSession = _connection.createSession(_transacted, _ackMode);
_consumerSession = new Session[_noOfConsumers];
for (int i = 0; i < _noOfConsumers; i++)
{
- _consumerSession[i] = (Session) _consumerConnection[i].createSession(_consTransacted, _consAckMode);
+ _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode);
}
// Create the destinations to send pings to and receive replies from.
@@ -620,24 +660,37 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*
* @param clientID The clients identifier.
*
- * @throws AMQException Any underlying exceptions are allowed to fall through.
- * @throws URLSyntaxException Any underlying exceptions are allowed to fall through.
+ * @throws JMSException Underlying exceptions allowed to fall through.
+ * @throws NamingException Underlying exceptions allowed to fall through.
+ * @throws IOException Underlying exceptions allowed to fall through.
*/
- protected void createConnection(String clientID) throws AMQException, URLSyntaxException
+ protected void createConnection(String clientID) throws JMSException, NamingException, IOException
{
- // log.debug("protected void createConnection(String clientID = " + clientID + "): called");
+ // _log.debug("protected void createConnection(String clientID = " + clientID + "): called");
+
+ // _log.debug("Creating a connection for the message producer.");
+ File propsFile = new File(_fileProperties);
+ InputStream is = new FileInputStream(propsFile);
+ Properties properties = new Properties();
+ properties.load(is);
- // log.debug("Creating a connection for the message producer.");
+ Context context = new InitialContext(properties);
+ ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName);
+ _connection = factory.createConnection(_username, _password);
- _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
+ if (_overrideClientId)
+ {
+ _connection.setClientID(clientID);
+ }
- // log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
+ // _log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
_consumerConnection = new Connection[_noOfConsumers];
for (int i = 0; i < _noOfConsumers; i++)
{
- _consumerConnection[i] = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
+ _consumerConnection[i] = factory.createConnection(_username, _password);
+ // _consumerConnection[i].setClientID(clientID);
}
}
@@ -744,10 +797,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
* @param durable If the destinations are durable topics.
*
* @throws JMSException Any JMSExceptions are allowed to fall through.
- * @throws AMQException Any AMQExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
- boolean durable) throws JMSException, AMQException
+ boolean durable) throws JMSException
{
/*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
@@ -760,8 +812,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
for (int i = 0; i < noOfDestinations; i++)
{
- AMQDestination destination;
-
+ Destination destination;
String id;
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
@@ -779,30 +830,18 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
- if (!durable)
- {
- destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- // log.debug("Created non-durable topic " + destination);
- }
- else
+ destination = _producerSession.createTopic(rootName + id);
+ // log.debug("Created non-durable topic " + destination);
+
+ if (durable)
{
- destination =
- AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
- _clientID, (AMQConnection) _connection);
- // log.debug("Created durable topic " + destination);
+ _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID());
}
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
- AMQShortString destinationName = new AMQShortString(rootName + id);
- destination =
- new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
- _isDurable);
- ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false);
- ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
- ExchangeDefaults.DIRECT_EXCHANGE_NAME);
-
+ destination = _producerSession.createQueue(rootName + id + _queueNamePostfix);
// log.debug("Created queue " + destination);
}
@@ -835,9 +874,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
for (int i = 0; i < _noOfConsumers; i++)
{
// Create a consumer for the destination and set this pinger to listen to its messages.
- _consumer[i] =
- _consumerSession[i].createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
- selector);
+ _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT);
final int consumerNo = i;
@@ -1374,7 +1411,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
{
- return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
+ // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
+ return TestUtils.createTestMessageOfSize(_producerSession, messageSize);
}
/**
@@ -1386,14 +1424,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
protected void setTimestamp(Message msg) throws JMSException
{
- if (((AMQSession) _producerSession).isStrictAMQP())
+ /*if (((AMQSession)_producerSession).isStrictAMQP())
{
- ((AMQMessage) msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
+ ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
}
else
- {
- msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
- }
+ {*/
+ msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+ // }
}
/**
@@ -1407,16 +1445,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
protected long getTimestamp(Message msg) throws JMSException
{
- if (((AMQSession) _producerSession).isStrictAMQP())
+ /*if (((AMQSession)_producerSession).isStrictAMQP())
{
- Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
+ Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
return (value == null) ? 0L : value;
}
else
- {
- return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
- }
+ {*/
+ return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+ // }
}
/**
@@ -1577,12 +1615,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
{
// log.debug("JMSException on commit:" + e.getMessage(), e);
- // Warn that the bounce back client is not available.
- if (e.getLinkedException() instanceof AMQNoConsumersException)
- {
- // log.debug("No consumers on queue.");
- }
-
try
{
session.rollback();