summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/perftests/src')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java2
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java4
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java5
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java40
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java57
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java59
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java2
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java11
8 files changed, 140 insertions, 40 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
index c0f236b833..eeb4021f34 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
@@ -103,7 +103,7 @@ public class TestMessageFactory
{
StringBuffer buf = new StringBuffer(size);
int count = 0;
- while (count < size)
+ while (count <= (size - MESSAGE_DATA_BYTES.length()))
{
buf.append(MESSAGE_DATA_BYTES);
count += MESSAGE_DATA_BYTES.length();
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
index 04381d66a0..14db74438f 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
@@ -49,7 +49,7 @@ public abstract class AbstractConfig
}
catch(NumberFormatException e)
{
- throw new RuntimeException(msg + ": " + i);
+ throw new RuntimeException(msg + ": " + i, e);
}
}
@@ -61,7 +61,7 @@ public abstract class AbstractConfig
}
catch(NumberFormatException e)
{
- throw new RuntimeException(msg + ": " + i);
+ throw new RuntimeException(msg + ": " + i, e);
}
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
index 44285efd96..a0248a8f79 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
@@ -22,6 +22,7 @@ package org.apache.qpid.config;
import org.apache.qpid.config.ConnectionFactoryInitialiser;
import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.client.JMSAMQException;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@@ -63,11 +64,11 @@ public class JBossConnectionFactoryInitialiser implements ConnectionFactoryIniti
}
catch (NamingException e)
{
- throw new JMSException("Unable to lookup object: " + e);
+ throw new JMSAMQException("Unable to lookup object: " + e, e);
}
catch (Exception e)
{
- throw new JMSException("Error creating topic: " + e);
+ throw new JMSAMQException("Error creating topic: " + e, e);
}
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
index 82e43e542f..0e832ef100 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
@@ -1,3 +1,4 @@
+/* Copyright Rupert Smith, 2005 to 2006, all rights reserved. */
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -178,6 +179,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
// Run the test procedure.
int sent = pingProducer.send();
+ pingProducer.closeConnection();
pingProducer.waitForUser("Press return to begin receiving the pings.");
pingProducer.receive(sent);
@@ -302,6 +304,13 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
+ ", Messages not Committed = " + messagesNotCommitted);
+
+
+ return messagesSent;
+ }
+
+ protected void closeConnection()
+ {
// Clean up the connection.
try
{
@@ -309,10 +318,11 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
}
catch (JMSException e)
{
+ log.debug("There was an error whilst closing the connection: " + e, e);
+ System.out.println("There was an error whilst closing the connection.");
+
// Ignore as did best could manage to clean up.
}
-
- return messagesSent;
}
protected void receive(int messagesSent) throws Exception
@@ -354,6 +364,32 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
}
}
+ // Ensure messages received are committed.
+ if (_transacted)
+ {
+ try
+ {
+ _consumerSession.commit();
+ System.out.println("Committed for all messages received.");
+ }
+ catch (JMSException e)
+ {
+ log.debug("Error during commit: " + e, e);
+ System.out.println("Error during commit.");
+ try
+ {
+ _consumerSession.rollback();
+ System.out.println("Rolled back on all messages received.");
+ }
+ catch (JMSException e2)
+ {
+ log.debug("Error during rollback: " + e, e);
+ System.out.println("Error on roll back of all messages received.");
+ }
+
+ }
+ }
+
log.debug("messagesReceived = " + messagesReceived);
System.out.println("Messages received: " + messagesReceived);
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
new file mode 100644
index 0000000000..7cf5e4516f
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
@@ -0,0 +1,57 @@
+/* Copyright Rupert Smith, 2005 to 2006, all rights reserved. */
+package org.apache.qpid.ping;
+
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.CommandLineParser;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class PingSendOnlyClient extends PingDurableClient
+{
+ private static final Logger log = Logger.getLogger(PingSendOnlyClient.class);
+
+ public PingSendOnlyClient(Properties overrides) throws Exception
+ {
+ super(overrides);
+ }
+
+ /**
+ * Starts the ping/wait/receive process.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ try
+ {
+ // Create a ping producer overriding its defaults with all options passed on the command line.
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+ PingDurableClient pingProducer = new PingSendOnlyClient(options);
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ // pingProducer.getConnection().setExceptionListener(pingProducer);
+
+ // Run the test procedure.
+ int sent = pingProducer.send();
+ pingProducer.waitForUser("Press return to close connection and quit.");
+ pingProducer.closeConnection();
+
+ System.exit(0);
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(1);
+ }
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 913685bca2..ecaf27167f 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -116,7 +116,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
* by the PPP that it is atteched to.
*
* @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair.
- * Obtian read lock on all messages, before decrementing the message count. At the end of the on message method add a
+ * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
* block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
* message waits until all other messages have been handled before releasing producers but allows messages to be
* processed concurrently, unlike the current synchronized block.
@@ -725,6 +725,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ ", String selector = " + selector + "): called");
+ log.debug("Creating " + destinations.size() + " reply consumers.");
+
for (Destination destination : destinations)
{
// Create a consumer for the destination and set this pinger to listen to its messages.
@@ -732,6 +734,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
selector);
_consumer.setMessageListener(this);
+
+ log.debug("Set this to listen to replies sent to destination: " + destination);
}
}
@@ -743,13 +747,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void onMessage(Message message)
{
- log.debug("public void onMessage(Message message): called");
+ // log.debug("public void onMessage(Message message): called");
try
{
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
- log.debug("correlationID = " + correlationID);
+ // log.debug("correlationID = " + correlationID);
// Countdown on the traffic light if there is one for the matching correlation id.
PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
@@ -761,7 +765,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Restart the timeout timer on every message.
perCorrelationId.timeOutStart = System.nanoTime();
- log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
// method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
@@ -776,8 +780,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
trueCount = trafficLight.getCount();
remainingCount = trueCount - 1;
- log.debug("remainingCount = " + remainingCount);
- log.debug("trueCount = " + trueCount);
+ // log.debug("remainingCount = " + remainingCount);
+ // log.debug("trueCount = " + trueCount);
// Commit on transaction batch size boundaries. At this point in time the waiting producer remains
// blocked, even on the last message.
@@ -806,23 +810,23 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
// Print out ping times for every message in verbose mode only.
- if (_verbose)
+ /*if (_verbose)
{
Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME);
if (timestamp != null)
{
long diff = System.nanoTime() - timestamp;
- log.trace("Time for round trip (nanos): " + diff);
+ //log.trace("Time for round trip (nanos): " + diff);
}
- }
+ }*/
}
catch (JMSException e)
{
log.warn("There was a JMSException: " + e.getMessage(), e);
}
- log.debug("public void onMessage(Message message): ending");
+ // log.debug("public void onMessage(Message message): ending");
}
/**
@@ -955,16 +959,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
committed = false;
// Re-timestamp the message.
- message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+ // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
// Send the message, passing in the message count.
committed = sendMessage(i, message);
// Spew out per message timings on every message sonly in verbose mode.
- if (_verbose)
+ /*if (_verbose)
{
log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
- }
+ }*/
}
// Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
@@ -1003,7 +1007,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_failBeforeSend = false;
}
- log.trace("Failing Before Send");
+ // log.trace("Failing Before Send");
waitForUser(KILL_BROKER_PROMPT);
}
@@ -1176,6 +1180,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
if (_connection != null)
{
_connection.close();
+ log.debug("Close connection.");
}
}
finally
@@ -1213,20 +1218,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
protected boolean commitTx(Session session) throws JMSException
{
- log.debug("protected void commitTx(Session session): called");
+ // log.debug("protected void commitTx(Session session): called");
boolean committed = false;
- log.trace("Batch time reached");
+ // log.trace("Batch time reached");
if (_failAfterSend)
{
- log.trace("Batch size reached");
+ // log.trace("Batch size reached");
if (_failOnce)
{
_failAfterSend = false;
}
- log.trace("Failing After Send");
+ // log.trace("Failing After Send");
waitForUser(KILL_BROKER_PROMPT);
}
@@ -1241,14 +1246,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_failBeforeCommit = false;
}
- log.trace("Failing Before Commit");
+ // log.trace("Failing Before Commit");
waitForUser(KILL_BROKER_PROMPT);
}
- long l = System.nanoTime();
+ // long l = System.nanoTime();
session.commit();
committed = true;
- log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
+ // log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
if (_failAfterCommit)
{
@@ -1257,15 +1262,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_failAfterCommit = false;
}
- log.trace("Failing After Commit");
+ // log.trace("Failing After Commit");
waitForUser(KILL_BROKER_PROMPT);
}
- log.trace("Session Commited.");
+ // log.trace("Session Commited.");
}
catch (JMSException e)
{
- log.trace("JMSException on commit:" + e.getMessage(), e);
+ log.debug("JMSException on commit:" + e.getMessage(), e);
// Warn that the bounce back client is not available.
if (e.getLinkedException() instanceof AMQNoConsumersException)
@@ -1276,11 +1281,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
try
{
session.rollback();
- log.trace("Message rolled back.");
+ log.debug("Message rolled back.");
}
catch (JMSException jmse)
{
- log.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+ log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
// Both commit and rollback failed. Throw the rollback exception.
throw jmse;
@@ -1296,7 +1301,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*
* @param prompt The prompt to display on the console.
*/
- protected void waitForUser(String prompt)
+ public void waitForUser(String prompt)
{
System.out.println(prompt);
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
index 342b28ca17..d5c0979399 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
@@ -221,7 +221,7 @@ public class Config extends AbstractConfig implements ConnectorConfig
}
catch(NumberFormatException e)
{
- throw new RuntimeException("Bad port number: " + value);
+ throw new RuntimeException("Bad port number: " + value, e);
}
}
else if("-payload".equalsIgnoreCase(key))
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index 3b8e670f8f..6c7f22c19a 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -133,12 +133,13 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
*/
public void testAsyncPingOk(int numPings) throws Exception
{
- _logger.debug("public void testAsyncPingOk(int numPings): called");
+ // _logger.debug("public void testAsyncPingOk(int numPings): called");
// Ensure that at least one ping was requeusted.
if (numPings == 0)
{
_logger.error("Number of pings requested was zero.");
+ fail("Number of pings requested was zero.");
}
// Get the per thread test setup to run the test through.
@@ -147,8 +148,8 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
// Advance the correlation id of messages to send, to make it unique for this run.
perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet());
- String messageCorrelationId = perThreadSetup._correlationId;
- _logger.debug("messageCorrelationId = " + messageCorrelationId);
+ // String messageCorrelationId = perThreadSetup._correlationId;
+ // _logger.debug("messageCorrelationId = " + messageCorrelationId);
// Initialize the count and timing controller for the new correlation id.
PerCorrelationId perCorrelationId = new PerCorrelationId();
@@ -246,9 +247,9 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
// Extract the correlation id from the message.
String correlationId = message.getJMSCorrelationID();
- _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount
+ /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount
+ "): called on batch boundary for message id: " + correlationId + " with thread id: "
- + Thread.currentThread().getId());
+ + Thread.currentThread().getId());*/
// Get the details for the correlation id and check that they are not null. They can become null
// if a test times out.