diff options
Diffstat (limited to 'qpid/java/perftests/src')
8 files changed, 140 insertions, 40 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java index c0f236b833..eeb4021f34 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java @@ -103,7 +103,7 @@ public class TestMessageFactory { StringBuffer buf = new StringBuffer(size); int count = 0; - while (count < size) + while (count <= (size - MESSAGE_DATA_BYTES.length())) { buf.append(MESSAGE_DATA_BYTES); count += MESSAGE_DATA_BYTES.length(); diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java index 04381d66a0..14db74438f 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java @@ -49,7 +49,7 @@ public abstract class AbstractConfig } catch(NumberFormatException e) { - throw new RuntimeException(msg + ": " + i); + throw new RuntimeException(msg + ": " + i, e); } } @@ -61,7 +61,7 @@ public abstract class AbstractConfig } catch(NumberFormatException e) { - throw new RuntimeException(msg + ": " + i); + throw new RuntimeException(msg + ": " + i, e); } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java index 44285efd96..a0248a8f79 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java @@ -22,6 +22,7 @@ package org.apache.qpid.config; import org.apache.qpid.config.ConnectionFactoryInitialiser; import org.apache.qpid.config.ConnectorConfig; +import org.apache.qpid.client.JMSAMQException; import javax.jms.ConnectionFactory; import javax.jms.JMSException; @@ -63,11 +64,11 @@ public class JBossConnectionFactoryInitialiser implements ConnectionFactoryIniti } catch (NamingException e) { - throw new JMSException("Unable to lookup object: " + e); + throw new JMSAMQException("Unable to lookup object: " + e, e); } catch (Exception e) { - throw new JMSException("Error creating topic: " + e); + throw new JMSAMQException("Error creating topic: " + e, e); } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java index 82e43e542f..0e832ef100 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -1,3 +1,4 @@ +/* Copyright Rupert Smith, 2005 to 2006, all rights reserved. */
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -178,6 +179,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList // Run the test procedure.
int sent = pingProducer.send();
+ pingProducer.closeConnection();
pingProducer.waitForUser("Press return to begin receiving the pings.");
pingProducer.receive(sent);
@@ -302,6 +304,13 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
+ ", Messages not Committed = " + messagesNotCommitted);
+
+
+ return messagesSent;
+ }
+
+ protected void closeConnection()
+ {
// Clean up the connection.
try
{
@@ -309,10 +318,11 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList }
catch (JMSException e)
{
+ log.debug("There was an error whilst closing the connection: " + e, e);
+ System.out.println("There was an error whilst closing the connection.");
+
// Ignore as did best could manage to clean up.
}
-
- return messagesSent;
}
protected void receive(int messagesSent) throws Exception
@@ -354,6 +364,32 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList }
}
+ // Ensure messages received are committed.
+ if (_transacted)
+ {
+ try
+ {
+ _consumerSession.commit();
+ System.out.println("Committed for all messages received.");
+ }
+ catch (JMSException e)
+ {
+ log.debug("Error during commit: " + e, e);
+ System.out.println("Error during commit.");
+ try
+ {
+ _consumerSession.rollback();
+ System.out.println("Rolled back on all messages received.");
+ }
+ catch (JMSException e2)
+ {
+ log.debug("Error during rollback: " + e, e);
+ System.out.println("Error on roll back of all messages received.");
+ }
+
+ }
+ }
+
log.debug("messagesReceived = " + messagesReceived);
System.out.println("Messages received: " + messagesReceived);
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java new file mode 100644 index 0000000000..7cf5e4516f --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java @@ -0,0 +1,57 @@ +/* Copyright Rupert Smith, 2005 to 2006, all rights reserved. */
+package org.apache.qpid.ping;
+
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.CommandLineParser;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class PingSendOnlyClient extends PingDurableClient
+{
+ private static final Logger log = Logger.getLogger(PingSendOnlyClient.class);
+
+ public PingSendOnlyClient(Properties overrides) throws Exception
+ {
+ super(overrides);
+ }
+
+ /**
+ * Starts the ping/wait/receive process.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ try
+ {
+ // Create a ping producer overriding its defaults with all options passed on the command line.
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+ PingDurableClient pingProducer = new PingSendOnlyClient(options);
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ // pingProducer.getConnection().setExceptionListener(pingProducer);
+
+ // Run the test procedure.
+ int sent = pingProducer.send();
+ pingProducer.waitForUser("Press return to close connection and quit.");
+ pingProducer.closeConnection();
+
+ System.exit(0);
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(1);
+ }
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 913685bca2..ecaf27167f 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -116,7 +116,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * by the PPP that it is atteched to.
*
* @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair.
- * Obtian read lock on all messages, before decrementing the message count. At the end of the on message method add a
+ * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
* block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
* message waits until all other messages have been handled before releasing producers but allows messages to be
* processed concurrently, unlike the current synchronized block.
@@ -725,6 +725,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ ", String selector = " + selector + "): called");
+ log.debug("Creating " + destinations.size() + " reply consumers.");
+
for (Destination destination : destinations)
{
// Create a consumer for the destination and set this pinger to listen to its messages.
@@ -732,6 +734,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
selector);
_consumer.setMessageListener(this);
+
+ log.debug("Set this to listen to replies sent to destination: " + destination);
}
}
@@ -743,13 +747,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
public void onMessage(Message message)
{
- log.debug("public void onMessage(Message message): called");
+ // log.debug("public void onMessage(Message message): called");
try
{
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
- log.debug("correlationID = " + correlationID);
+ // log.debug("correlationID = " + correlationID);
// Countdown on the traffic light if there is one for the matching correlation id.
PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
@@ -761,7 +765,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Restart the timeout timer on every message.
perCorrelationId.timeOutStart = System.nanoTime();
- log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
// method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
@@ -776,8 +780,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis trueCount = trafficLight.getCount();
remainingCount = trueCount - 1;
- log.debug("remainingCount = " + remainingCount);
- log.debug("trueCount = " + trueCount);
+ // log.debug("remainingCount = " + remainingCount);
+ // log.debug("trueCount = " + trueCount);
// Commit on transaction batch size boundaries. At this point in time the waiting producer remains
// blocked, even on the last message.
@@ -806,23 +810,23 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
// Print out ping times for every message in verbose mode only.
- if (_verbose)
+ /*if (_verbose)
{
Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME);
if (timestamp != null)
{
long diff = System.nanoTime() - timestamp;
- log.trace("Time for round trip (nanos): " + diff);
+ //log.trace("Time for round trip (nanos): " + diff);
}
- }
+ }*/
}
catch (JMSException e)
{
log.warn("There was a JMSException: " + e.getMessage(), e);
}
- log.debug("public void onMessage(Message message): ending");
+ // log.debug("public void onMessage(Message message): ending");
}
/**
@@ -955,16 +959,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis committed = false;
// Re-timestamp the message.
- message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+ // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
// Send the message, passing in the message count.
committed = sendMessage(i, message);
// Spew out per message timings on every message sonly in verbose mode.
- if (_verbose)
+ /*if (_verbose)
{
log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
- }
+ }*/
}
// Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
@@ -1003,7 +1007,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failBeforeSend = false;
}
- log.trace("Failing Before Send");
+ // log.trace("Failing Before Send");
waitForUser(KILL_BROKER_PROMPT);
}
@@ -1176,6 +1180,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if (_connection != null)
{
_connection.close();
+ log.debug("Close connection.");
}
}
finally
@@ -1213,20 +1218,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
protected boolean commitTx(Session session) throws JMSException
{
- log.debug("protected void commitTx(Session session): called");
+ // log.debug("protected void commitTx(Session session): called");
boolean committed = false;
- log.trace("Batch time reached");
+ // log.trace("Batch time reached");
if (_failAfterSend)
{
- log.trace("Batch size reached");
+ // log.trace("Batch size reached");
if (_failOnce)
{
_failAfterSend = false;
}
- log.trace("Failing After Send");
+ // log.trace("Failing After Send");
waitForUser(KILL_BROKER_PROMPT);
}
@@ -1241,14 +1246,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failBeforeCommit = false;
}
- log.trace("Failing Before Commit");
+ // log.trace("Failing Before Commit");
waitForUser(KILL_BROKER_PROMPT);
}
- long l = System.nanoTime();
+ // long l = System.nanoTime();
session.commit();
committed = true;
- log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
+ // log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
if (_failAfterCommit)
{
@@ -1257,15 +1262,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failAfterCommit = false;
}
- log.trace("Failing After Commit");
+ // log.trace("Failing After Commit");
waitForUser(KILL_BROKER_PROMPT);
}
- log.trace("Session Commited.");
+ // log.trace("Session Commited.");
}
catch (JMSException e)
{
- log.trace("JMSException on commit:" + e.getMessage(), e);
+ log.debug("JMSException on commit:" + e.getMessage(), e);
// Warn that the bounce back client is not available.
if (e.getLinkedException() instanceof AMQNoConsumersException)
@@ -1276,11 +1281,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis try
{
session.rollback();
- log.trace("Message rolled back.");
+ log.debug("Message rolled back.");
}
catch (JMSException jmse)
{
- log.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+ log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
// Both commit and rollback failed. Throw the rollback exception.
throw jmse;
@@ -1296,7 +1301,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis *
* @param prompt The prompt to display on the console.
*/
- protected void waitForUser(String prompt)
+ public void waitForUser(String prompt)
{
System.out.println(prompt);
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java index 342b28ca17..d5c0979399 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java @@ -221,7 +221,7 @@ public class Config extends AbstractConfig implements ConnectorConfig } catch(NumberFormatException e) { - throw new RuntimeException("Bad port number: " + value); + throw new RuntimeException("Bad port number: " + value, e); } } else if("-payload".equalsIgnoreCase(key)) diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index 3b8e670f8f..6c7f22c19a 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -133,12 +133,13 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA */ public void testAsyncPingOk(int numPings) throws Exception { - _logger.debug("public void testAsyncPingOk(int numPings): called"); + // _logger.debug("public void testAsyncPingOk(int numPings): called"); // Ensure that at least one ping was requeusted. if (numPings == 0) { _logger.error("Number of pings requested was zero."); + fail("Number of pings requested was zero."); } // Get the per thread test setup to run the test through. @@ -147,8 +148,8 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Advance the correlation id of messages to send, to make it unique for this run. perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); - String messageCorrelationId = perThreadSetup._correlationId; - _logger.debug("messageCorrelationId = " + messageCorrelationId); + // String messageCorrelationId = perThreadSetup._correlationId; + // _logger.debug("messageCorrelationId = " + messageCorrelationId); // Initialize the count and timing controller for the new correlation id. PerCorrelationId perCorrelationId = new PerCorrelationId(); @@ -246,9 +247,9 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Extract the correlation id from the message. String correlationId = message.getJMSCorrelationID(); - _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called on batch boundary for message id: " + correlationId + " with thread id: " - + Thread.currentThread().getId()); + + Thread.currentThread().getId());*/ // Get the details for the correlation id and check that they are not null. They can become null // if a test times out. |
