From df0cc12065d26d953bb7bd7f603ee0a59cfe3939 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Wed, 3 Oct 2007 16:28:38 +0000 Subject: Performance enhancements for the tests, producers stalled individually above maxPending size. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@581647 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/ping/PingAsyncTestPerf.java | 2 - .../apache/qpid/requestreply/PingPongProducer.java | 252 ++++++++++++--------- 2 files changed, 142 insertions(+), 112 deletions(-) (limited to 'java/perftests/src') diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java index edbc311bd5..06081e6ebf 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -29,11 +29,9 @@ import org.apache.qpid.requestreply.PingPongProducer; import uk.co.thebadgerset.junit.extensions.TimingController; import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.ObjectMessage; import java.util.Collections; import java.util.HashMap; diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index bca67bb0ce..bf1d9aba4a 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -423,14 +423,20 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected * to wait until the number of unreceived message is reduced before continuing to send. */ - protected static final Object _sendPauseMonitor = new Object(); + protected final Object _sendPauseMonitor = new Object(); /** Keeps a count of the number of message currently sent but not received. */ - protected static AtomicInteger _unreceived = new AtomicInteger(0); + protected AtomicInteger _unreceived = new AtomicInteger(0); /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); + /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */ + private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0); + + /** Holds this instances unique id. */ + private int instanceId; + /** * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple * ping producers on the same JVM. @@ -507,6 +513,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti public PingPongProducer(Properties overrides) throws Exception { // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); + instanceId = _instanceIdGenerator.getAndIncrement(); // Create a set of parsed properties from the defaults overriden by the passed in values. ParsedProperties properties = new ParsedProperties(defaults); @@ -814,9 +821,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /*log.debug("public void createReplyConsumers(Collection destinations = " + destinations + ", String selector = " + selector + "): called");*/ - // log.debug("There are " + destinations.size() + " destinations."); - // log.debug("Creating " + _noOfConsumers + " consumers on each destination."); - // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); + log.debug("There are " + destinations.size() + " destinations."); + log.debug("Creating " + _noOfConsumers + " consumers on each destination."); + log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); for (Destination destination : destinations) { @@ -839,7 +846,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } }); - // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); + log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); } } } @@ -861,7 +868,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti long timestamp = getTimestamp(message); long pingTime = now - timestamp; - NDC.push("cons" + consumerNo); + // NDC.push("id" + instanceId + "/cons" + consumerNo); // Extract the messages correlation id. String correlationID = message.getJMSCorrelationID(); @@ -887,38 +894,41 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); - // Decrement the countdown latch. Before this point, it is possible that two threads might enter this - // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block - // ensures that each thread will get a unique value for the remaining messages. - long trueCount; - long remainingCount; + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); - synchronized (trafficLight) + // Release waiting senders if there are some and using maxPending limit. + if ((_maxPendingSize > 0)) { - trafficLight.countDown(); - - trueCount = trafficLight.getCount(); - remainingCount = trueCount - 1; - // Decrement the count of sent but not yet received messages. int unreceived = _unreceived.decrementAndGet(); int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / (_isPubSub ? getConsumersPerDestination() : 1); - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - - // Release a waiting sender if there is one. synchronized (_sendPauseMonitor) { - if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)) + if (unreceivedSize < _maxPendingSize) { _sendPauseMonitor.notify(); } } + } + + // Decrement the countdown latch. Before this point, it is possible that two threads might enter this + // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block + // ensures that each thread will get a unique value for the remaining messages. + long trueCount; + long remainingCount; - NDC.push("/rem" + remainingCount); + synchronized (trafficLight) + { + trafficLight.countDown(); + + trueCount = trafficLight.getCount(); + remainingCount = trueCount - 1; + + // NDC.push("/rem" + remainingCount); // log.debug("remainingCount = " + remainingCount); // log.debug("trueCount = " + trueCount); @@ -1069,7 +1079,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // commitTx(_consumerSession); - // //log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); + // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); return numReplies; } @@ -1146,109 +1156,131 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ protected boolean sendMessage(int i, Message message) throws JMSException { - // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); - // log.debug("_txBatchSize = " + _txBatchSize); - - // Round robin the destinations as the messages are sent. - Destination destination = _pingDestinations.get(i % _pingDestinations.size()); - - // Prompt the user to kill the broker when doing failover testing. - _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); - - // If necessary, wait until the max pending message size comes within its limit. - synchronized (_sendPauseMonitor) + try { - // Used to keep track of the number of times that send has to wait. - int numWaits = 0; + NDC.push("id" + instanceId + "/prod"); - // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with - // the test timeout. - int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); + // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); + // log.debug("_txBatchSize = " + _txBatchSize); - while ((_maxPendingSize > 0)) - { - // Get the size estimate of sent but not yet received messages. - int unreceived = _unreceived.get(); - int unreceivedSize = - (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / (_isPubSub ? getConsumersPerDestination() : 1); + // Round robin the destinations as the messages are sent. + Destination destination = _pingDestinations.get(i % _pingDestinations.size()); - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - // log.debug("_maxPendingSize = " + _maxPendingSize); + // Prompt the user to kill the broker when doing failover testing. + _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); - if (unreceivedSize > _maxPendingSize) + // If necessary, wait until the max pending message size comes within its limit. + if (_maxPendingSize > 0) + { + synchronized (_sendPauseMonitor) { - // log.debug("unreceived size estimate over limit = " + unreceivedSize); + // Used to keep track of the number of times that send has to wait. + int numWaits = 0; - // Wait on the send pause barrier for the limit to be re-established. - try - { - _sendPauseMonitor.wait(10000); - numWaits++; - } - catch (InterruptedException e) - { - // Restore the interrupted status - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } + // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with + // the test timeout. + int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); - // Fail the test if the send has had to wait more than the maximum allowed number of times. - if (numWaits >= waitLimit) + while (true) { - String errorMessage = - "Send has had to wait for the unreceivedSize (" + unreceivedSize - + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit - + " times."; - log.warn(errorMessage); - throw new RuntimeException(errorMessage); + // Get the size estimate of sent but not yet received messages. + int unreceived = _unreceived.get(); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) + / (_isPubSub ? getConsumersPerDestination() : 1); + + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); + // log.debug("_maxPendingSize = " + _maxPendingSize); + + if (unreceivedSize > _maxPendingSize) + { + // log.debug("unreceived size estimate over limit = " + unreceivedSize); + + // Fail the test if the send has had to wait more than the maximum allowed number of times. + if (numWaits > waitLimit) + { + String errorMessage = + "Send has had to wait for the unreceivedSize (" + unreceivedSize + + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit + + " times."; + log.warn(errorMessage); + throw new RuntimeException(errorMessage); + } + + // Wait on the send pause barrier for the limit to be re-established. + try + { + long start = System.nanoTime(); + _sendPauseMonitor.wait(10000); + long end = System.nanoTime(); + + // Count the wait only if it was for > 99% of the requested wait time. + if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99) + { + numWaits++; + } + } + catch (InterruptedException e) + { + // Restore the interrupted status + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + else + { + break; + } } } - else - { - break; - } } - } - // Send the message either to its round robin destination, or its default destination. - int num = numSent.incrementAndGet(); - message.setIntProperty("MSG_NUM", num); - setTimestamp(message); + // Send the message either to its round robin destination, or its default destination. + // int num = numSent.incrementAndGet(); + // message.setIntProperty("MSG_NUM", num); + setTimestamp(message); - if (destination == null) - { - _producer.send(message); - } - else - { - _producer.send(destination, message); - } + if (destination == null) + { + _producer.send(message); + } + else + { + _producer.send(destination, message); + } - // Increase the unreceived size, this may actually happen after the message is received. - // The unreceived size is incremented by the number of consumers that will get a copy of the message, - // in pub/sub mode. - // _unreceived.getAndIncrement(); - /*int newUnreceivedCount =*/ _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); - // log.debug("newUnreceivedCount = " + newUnreceivedCount); + // Increase the unreceived size, this may actually happen after the message is received. + // The unreceived size is incremented by the number of consumers that will get a copy of the message, + // in pub/sub mode. + if (_maxPendingSize > 0) + { + int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); + // log.debug("newUnreceivedCount = " + newUnreceivedCount); + } - // Apply message rate throttling if a rate limit has been set up. - if (_rateLimiter != null) - { - _rateLimiter.throttle(); - } + // Apply message rate throttling if a rate limit has been set up. + if (_rateLimiter != null) + { + _rateLimiter.throttle(); + } - // Call commit every time the commit batch size is reached. - boolean committed = false; + // Call commit every time the commit batch size is reached. + boolean committed = false; + + // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. + if (((i + 1) % _txBatchSize) == 0) + { + // log.debug("Trying commit on producer session."); + committed = commitTx(_producerSession); + } - // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. - if (((i + 1) % _txBatchSize) == 0) + return committed; + } + finally { - // log.debug("Trying commit on producer session."); - committed = commitTx(_producerSession); + NDC.clear(); } - - return committed; } /** @@ -1269,7 +1301,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti failFlag = false; } - // log.trace("Failing Before Send"); + // log.debug("Failing Before Send"); waitForUser(KILL_BROKER_PROMPT); } @@ -1524,7 +1556,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti { _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); - // long start = System.nanoTime(); + long start = System.nanoTime(); session.commit(); committed = true; // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); -- cgit v1.2.1